| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.reef.javabridge.generic; |
| |
| import org.apache.reef.driver.client.JobMessageObserver; |
| import org.apache.reef.driver.context.ActiveContext; |
| import org.apache.reef.driver.context.ClosedContext; |
| import org.apache.reef.driver.context.ContextMessage; |
| import org.apache.reef.driver.context.FailedContext; |
| import org.apache.reef.driver.evaluator.*; |
| import org.apache.reef.driver.restart.DriverRestarted; |
| import org.apache.reef.driver.task.*; |
| import org.apache.reef.io.network.naming.NameServer; |
| import org.apache.reef.javabridge.*; |
| import org.apache.reef.driver.restart.DriverRestartCompleted; |
| import org.apache.reef.runtime.common.driver.DriverStatusManager; |
| import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes; |
| import org.apache.reef.runtime.common.files.REEFFileNames; |
| import org.apache.reef.tang.annotations.Parameter; |
| import org.apache.reef.tang.annotations.Unit; |
| import org.apache.reef.util.Optional; |
| import org.apache.reef.util.logging.CLRBufferedLogHandler; |
| import org.apache.reef.util.logging.LoggingScope; |
| import org.apache.reef.util.logging.LoggingScopeFactory; |
| import org.apache.reef.wake.EventHandler; |
| import org.apache.reef.wake.remote.address.LocalAddressProvider; |
| import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; |
| import org.apache.reef.wake.time.Clock; |
| import org.apache.reef.wake.time.event.Alarm; |
| import org.apache.reef.wake.time.event.StartTime; |
| import org.apache.reef.wake.time.event.StopTime; |
| import org.apache.reef.webserver.*; |
| |
| import javax.inject.Inject; |
| import javax.servlet.ServletException; |
| import javax.servlet.http.HttpServletResponse; |
| import java.io.*; |
| import java.nio.charset.Charset; |
| import java.nio.charset.StandardCharsets; |
| import java.text.DateFormat; |
| import java.text.SimpleDateFormat; |
| import java.util.*; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.logging.Handler; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| /** |
| * Generic job driver for CLRBridge. |
| */ |
| @Unit |
| public final class JobDriver { |
| |
| private static final Logger LOG = Logger.getLogger(JobDriver.class.getName()); |
| /** |
| * String codec is used to encode the results |
| * before passing them back to the client. |
| */ |
| private static final ObjectSerializableCodec<String> JVM_CODEC = new ObjectSerializableCodec<>(); |
| private final InteropLogger interopLogger = new InteropLogger(); |
| private final NameServer nameServer; |
| private final String nameServerInfo; |
| private final HttpServer httpServer; |
| private final ActiveContextBridgeFactory activeContextBridgeFactory; |
| private final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory; |
| |
| /** |
| * Wake clock is used to schedule periodical job check-ups. |
| */ |
| private final Clock clock; |
| /** |
| * Job observer on the client. |
| * We use it to send results from the driver back to the client. |
| */ |
| private final JobMessageObserver jobMessageObserver; |
| /** |
| * Job driver uses EvaluatorRequestor |
| * to request Evaluators that will run the Tasks. |
| */ |
| private final EvaluatorRequestor evaluatorRequestor; |
| |
| /** |
| * Driver status manager to monitor driver status. |
| */ |
| private final DriverStatusManager driverStatusManager; |
| |
| /** |
| * Factory to setup new CLR process configurations. |
| */ |
| private final CLRProcessFactory clrProcessFactory; |
| |
| /** |
| * Shell execution results from each Evaluator. |
| */ |
| private final List<String> results = new ArrayList<>(); |
| /** |
| * Map from context ID to running evaluator context. |
| */ |
| private final ConcurrentHashMap<String, ActiveContext> contexts = new ConcurrentHashMap<>(); |
| |
| private final REEFFileNames reefFileNames; |
| private final LocalAddressProvider localAddressProvider; |
| /** |
| * Logging scope factory that provides LoggingScope. |
| */ |
| private final LoggingScopeFactory loggingScopeFactory; |
| private final Set<String> definedRuntimes; |
| private final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); |
| |
| private BridgeHandlerManager handlerManager = null; |
| private boolean isRestarted = false; |
| // We are holding on to following on bridge side. |
| // Need to add references here so that GC does not collect them. |
| private final ConcurrentHashMap<String, AllocatedEvaluatorBridge> allocatedEvaluatorBridges = |
| new ConcurrentHashMap<>(); |
| private EvaluatorRequestorBridge evaluatorRequestorBridge; |
| |
| |
| /** |
| * Job driver constructor. |
| * All parameters are injected from TANG automatically. |
| * |
| * @param clock Wake clock to schedule and check up running jobs. |
| * @param jobMessageObserver is used to send messages back to the client. |
| * @param evaluatorRequestor is used to request Evaluators. |
| * @param activeContextBridgeFactory |
| */ |
| @Inject |
| JobDriver(final Clock clock, |
| final HttpServer httpServer, |
| final NameServer nameServer, |
| final JobMessageObserver jobMessageObserver, |
| final EvaluatorRequestor evaluatorRequestor, |
| final DriverStatusManager driverStatusManager, |
| final LoggingScopeFactory loggingScopeFactory, |
| final LocalAddressProvider localAddressProvider, |
| final ActiveContextBridgeFactory activeContextBridgeFactory, |
| final REEFFileNames reefFileNames, |
| final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory, |
| final CLRProcessFactory clrProcessFactory, |
| @Parameter(DefinedRuntimes.class) final Set<String> definedRuntimes) { |
| this.clock = clock; |
| this.httpServer = httpServer; |
| this.jobMessageObserver = jobMessageObserver; |
| this.evaluatorRequestor = evaluatorRequestor; |
| this.nameServer = nameServer; |
| this.driverStatusManager = driverStatusManager; |
| this.activeContextBridgeFactory = activeContextBridgeFactory; |
| this.allocatedEvaluatorBridgeFactory = allocatedEvaluatorBridgeFactory; |
| this.nameServerInfo = localAddressProvider.getLocalAddress() + ":" + this.nameServer.getPort(); |
| this.loggingScopeFactory = loggingScopeFactory; |
| this.reefFileNames = reefFileNames; |
| this.localAddressProvider = localAddressProvider; |
| this.clrProcessFactory = clrProcessFactory; |
| this.definedRuntimes = definedRuntimes; |
| } |
| |
| private void setupBridge() { |
| // Signal to the clr buffered log handler that the driver has started and that |
| // we can begin logging |
| LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler..."); |
| try (LoggingScope lb = this.loggingScopeFactory.setupBridge()) { |
| final CLRBufferedLogHandler handler = getCLRBufferedLogHandler(); |
| if (handler == null) { |
| LOG.log(Level.WARNING, "CLRBufferedLogHandler could not be initialized"); |
| } else { |
| handler.setDriverInitialized(); |
| LOG.log(Level.INFO, "CLRBufferedLogHandler init complete."); |
| } |
| |
| final String portNumber = httpServer == null ? null : Integer.toString(httpServer.getPort()); |
| if (portNumber != null) { |
| try { |
| final File outputFileName = new File(reefFileNames.getDriverHttpEndpoint()); |
| BufferedWriter out = new BufferedWriter( |
| new OutputStreamWriter(new FileOutputStream(outputFileName), StandardCharsets.UTF_8)); |
| out.write(localAddressProvider.getLocalAddress() + ":" + portNumber + "\n"); |
| out.close(); |
| } catch (IOException ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| this.evaluatorRequestorBridge = |
| new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory, |
| JobDriver.this.definedRuntimes); |
| JobDriver.this.handlerManager = new BridgeHandlerManager(); |
| NativeInterop.clrSystemSetupBridgeHandlerManager(portNumber, |
| JobDriver.this.handlerManager, evaluatorRequestorBridge); |
| |
| try (LoggingScope lp = |
| this.loggingScopeFactory.getNewLoggingScope("setupBridge::clrSystemHttpServerHandlerOnNext")) { |
| final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge("SPEC"); |
| NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(), |
| httpServerEventBridge, this.interopLogger); |
| final String specList = httpServerEventBridge.getUriSpecification(); |
| LOG.log(Level.INFO, "Starting http server, getUriSpecification: {0}", specList); |
| if (specList != null) { |
| final String[] specs = specList.split(":"); |
| for (final String s : specs) { |
| final HttpHandler h = new HttpServerBridgeEventHandler(); |
| h.setUriSpecification(s); |
| this.httpServer.addHttpHandler(h); |
| } |
| } |
| } |
| } |
| LOG.log(Level.INFO, "CLR Bridge setup."); |
| } |
| |
| private CLRBufferedLogHandler getCLRBufferedLogHandler() { |
| for (final Handler handler : Logger.getLogger("").getHandlers()) { |
| if (handler instanceof CLRBufferedLogHandler) { |
| return (CLRBufferedLogHandler) handler; |
| } |
| } |
| return null; |
| } |
| |
| private void submitEvaluator(final AllocatedEvaluator eval, final EvaluatorProcess process) { |
| eval.setProcess(process); |
| LOG.log(Level.FINE, "Allocated Evaluator: {0}, total running running {1}.", |
| new Object[]{eval.getId(), JobDriver.this.contexts.size()}); |
| final long handler = JobDriver.this.handlerManager.getAllocatedEvaluatorHandler(); |
| if (0 == handler) { |
| throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR."); |
| } |
| final AllocatedEvaluatorBridge allocatedEvaluatorBridge = |
| this.allocatedEvaluatorBridgeFactory.getAllocatedEvaluatorBridge(eval, this.nameServerInfo); |
| allocatedEvaluatorBridges.putIfAbsent(allocatedEvaluatorBridge.getId(), allocatedEvaluatorBridge); |
| NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext( |
| handler, allocatedEvaluatorBridge, this.interopLogger, this.nameServerInfo, eval.getId()); |
| LOG.log(Level.FINE, "End of JobDriver.Allocated Evaluator: {0}, time: {1}", |
| new Object[] {eval.getId(), dateFormat.format(new Date())}); |
| } |
| |
| private void handleFailedEvaluator(final FailedEvaluator eval, final boolean isRestartFailed) { |
| try (LoggingScope ls = loggingScopeFactory.evaluatorFailed(eval.getId())) { |
| LOG.log(Level.SEVERE, "FailedEvaluator", eval); |
| for (final FailedContext failedContext : eval.getFailedContextList()) { |
| final String failedContextId = failedContext.getId(); |
| LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts."); |
| JobDriver.this.contexts.remove(failedContextId); |
| } |
| final String message = "Evaluator " + eval.getId() + " failed with message: " |
| + eval.getEvaluatorException().getMessage(); |
| JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); |
| |
| if (isRestartFailed) { |
| evaluatorFailedHandlerWaitForCLRBridgeSetup( |
| JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), eval, isRestartFailed); |
| } else { |
| evaluatorFailedHandlerWaitForCLRBridgeSetup(JobDriver.this.handlerManager.getFailedEvaluatorHandler(), |
| eval, isRestartFailed); |
| } |
| } |
| } |
| |
| private void evaluatorFailedHandlerWaitForCLRBridgeSetup(final long handle, |
| final FailedEvaluator eval, |
| final boolean isRestartFailed) { |
| if (handle == 0) { |
| if (JobDriver.this.handlerManager != null) { |
| final String message = "No CLR FailedEvaluator handler was set, exiting now"; |
| LOG.log(Level.WARNING, message); |
| JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); |
| } else { |
| clock.scheduleAlarm(0, new EventHandler<Alarm>() { |
| @Override |
| public void onNext(final Alarm time) { |
| if (JobDriver.this.handlerManager != null) { |
| handleFailedEvaluatorInCLR(eval, isRestartFailed); |
| } else { |
| LOG.log(Level.INFO, "Waiting for CLR bridge to be set up"); |
| clock.scheduleAlarm(5000, this); |
| } |
| } |
| }); |
| } |
| } else{ |
| handleFailedEvaluatorInCLR(eval, isRestartFailed); |
| } |
| } |
| |
| private void handleFailedEvaluatorInCLR(final FailedEvaluator eval, final boolean isRestartFailed) { |
| final String message = "CLR FailedEvaluator handler set, handling things with CLR handler."; |
| LOG.log(Level.INFO, message); |
| final FailedEvaluatorBridge failedEvaluatorBridge = |
| new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor, |
| JobDriver.this.isRestarted, loggingScopeFactory, activeContextBridgeFactory, JobDriver.this.definedRuntimes); |
| if (isRestartFailed) { |
| NativeInterop.clrSystemDriverRestartFailedEvaluatorHandlerOnNext( |
| JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), |
| failedEvaluatorBridge, JobDriver.this.interopLogger); |
| } else { |
| NativeInterop.clrSystemFailedEvaluatorHandlerOnNext( |
| JobDriver.this.handlerManager.getFailedEvaluatorHandler(), |
| failedEvaluatorBridge, |
| JobDriver.this.interopLogger); |
| } |
| |
| final int additionalRequestedEvaluatorNumber = failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber(); |
| if (additionalRequestedEvaluatorNumber > 0) { |
| LOG.log(Level.INFO, "number of additional evaluators requested after evaluator failure: " + |
| additionalRequestedEvaluatorNumber); |
| } |
| |
| JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); |
| } |
| |
| /** |
| * Submit a Task to a single Evaluator. |
| */ |
| private void submit(final ActiveContext context) { |
| try { |
| LOG.log(Level.INFO, "Send task to context: {0}", new Object[]{context}); |
| if (JobDriver.this.handlerManager.getActiveContextHandler() == 0) { |
| throw new RuntimeException("Active Context Handler not initialized by CLR."); |
| } |
| final ActiveContextBridge activeContextBridge = activeContextBridgeFactory.getActiveContextBridge(context); |
| NativeInterop.clrSystemActiveContextHandlerOnNext(JobDriver.this.handlerManager.getActiveContextHandler(), |
| activeContextBridge, JobDriver.this.interopLogger); |
| } catch (final Exception ex) { |
| LOG.log(Level.SEVERE, "Fail to submit task to active context"); |
| context.close(); |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| /** |
| * Handles AllocatedEvaluator: Submit an empty context. |
| */ |
| public final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { |
| @Override |
| public void onNext(final AllocatedEvaluator allocatedEvaluator) { |
| try (LoggingScope ls = loggingScopeFactory.evaluatorAllocated(allocatedEvaluator.getId())) { |
| LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext"); |
| JobDriver.this.submitEvaluator(allocatedEvaluator, clrProcessFactory.newEvaluatorProcess()); |
| } |
| } |
| } |
| |
| /** |
| * Receive notification that a new Context is available. |
| */ |
| public final class ActiveContextHandler implements EventHandler<ActiveContext> { |
| @Override |
| public void onNext(final ActiveContext context) { |
| try (LoggingScope ls = loggingScopeFactory.activeContextReceived(context.getId())) { |
| LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0}", |
| new Object[]{context.getId()}); |
| JobDriver.this.contexts.put(context.getId(), context); |
| JobDriver.this.submit(context); |
| } |
| } |
| } |
| |
| /** |
| * Receive notification that the Task has completed successfully. |
| */ |
| public final class CompletedTaskHandler implements EventHandler<CompletedTask> { |
| @Override |
| public void onNext(final CompletedTask task) { |
| LOG.log(Level.INFO, "Completed task: {0}", task.getId()); |
| try (LoggingScope ls = loggingScopeFactory.taskCompleted(task.getId())) { |
| // Take the message returned by the task and add it to the running result. |
| String result = "default result"; |
| try { |
| result = new String(task.get(), StandardCharsets.UTF_8); |
| } catch (final Exception e) { |
| LOG.log(Level.WARNING, "failed to decode task outcome"); |
| } |
| LOG.log(Level.INFO, "Return results to the client:\n{0}", result); |
| JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(result)); |
| if (JobDriver.this.handlerManager.getCompletedTaskHandler() == 0) { |
| LOG.log(Level.INFO, "No CLR handler bound to handle completed task."); |
| } else { |
| LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler."); |
| final CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task, activeContextBridgeFactory); |
| NativeInterop.clrSystemCompletedTaskHandlerOnNext(JobDriver.this.handlerManager.getCompletedTaskHandler(), |
| completedTaskBridge, JobDriver.this.interopLogger); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Receive notification that the entire Evaluator had failed. |
| */ |
| public final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { |
| @Override |
| public void onNext(final FailedEvaluator eval) { |
| JobDriver.this.handleFailedEvaluator(eval, false); |
| allocatedEvaluatorBridges.remove(eval.getId()); |
| } |
| } |
| |
| /** |
| * Receive notification that the entire Evaluator had failed on Driver Restart. |
| */ |
| public final class DriverRestartFailedEvaluatorHandler implements EventHandler<FailedEvaluator> { |
| @Override |
| public void onNext(final FailedEvaluator eval) { |
| JobDriver.this.handleFailedEvaluator(eval, true); |
| } |
| } |
| |
| final class HttpServerBridgeEventHandler implements HttpHandler { |
| private String uriSpecification; |
| |
| /** |
| * returns URI specification for the handler. |
| */ |
| @Override |
| public String getUriSpecification() { |
| return uriSpecification; |
| } |
| |
| public void setUriSpecification(final String s) { |
| uriSpecification = s; |
| } |
| |
| /** |
| * process http request. |
| */ |
| @Override |
| public void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response) |
| throws IOException, ServletException { |
| LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest: {0}", parsedHttpRequest.getRequestUri()); |
| try (LoggingScope ls = loggingScopeFactory.httpRequest(parsedHttpRequest.getRequestUri())) { |
| final AvroHttpSerializer httpSerializer = new AvroHttpSerializer(); |
| final AvroHttpRequest avroHttpRequest = httpSerializer.toAvro(parsedHttpRequest); |
| |
| final String requestString = httpSerializer.toString(avroHttpRequest); |
| final byte[] requestBytes = requestString.getBytes(Charset.forName(AvroHttpSerializer.JSON_CHARSET)); |
| |
| try { |
| final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge(requestBytes); |
| NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(), |
| httpServerEventBridge, JobDriver.this.interopLogger); |
| final String responseBody = new String(httpServerEventBridge.getQueryResponseData(), "UTF-8"); |
| response.getWriter().println(responseBody); |
| LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest received response: {0}", responseBody); |
| } catch (final Exception ex) { |
| LOG.log(Level.SEVERE, "Fail to invoke CLR Http Server handler", ex); |
| throw new RuntimeException(ex); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Handle failed task. |
| */ |
| public final class FailedTaskHandler implements EventHandler<FailedTask> { |
| @Override |
| public void onNext(final FailedTask task) throws RuntimeException { |
| LOG.log(Level.SEVERE, "FailedTask received, will be handle in CLR handler, if set."); |
| if (JobDriver.this.handlerManager.getFailedTaskHandler() == 0) { |
| LOG.log(Level.SEVERE, "Failed Task Handler not initialized by CLR, fail for real."); |
| throw new RuntimeException("Failed Task Handler not initialized by CLR."); |
| } |
| try { |
| final FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task, activeContextBridgeFactory); |
| NativeInterop.clrSystemFailedTaskHandlerOnNext(JobDriver.this.handlerManager.getFailedTaskHandler(), |
| failedTaskBridge, JobDriver.this.interopLogger); |
| } catch (final Exception ex) { |
| LOG.log(Level.SEVERE, "Fail to invoke CLR failed task handler"); |
| throw new RuntimeException(ex); |
| } |
| } |
| } |
| |
| /** |
| * Receive notification that the Task is running. |
| */ |
| public final class RunningTaskHandler implements EventHandler<RunningTask> { |
| @Override |
| public void onNext(final RunningTask task) { |
| try (LoggingScope ls = loggingScopeFactory.taskRunning(task.getId())) { |
| if (JobDriver.this.handlerManager.getRunningTaskHandler() == 0) { |
| LOG.log(Level.INFO, "RunningTask event received but no CLR handler was bound. Exiting handler."); |
| } else { |
| LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId()); |
| try { |
| final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task, activeContextBridgeFactory); |
| NativeInterop.clrSystemRunningTaskHandlerOnNext(JobDriver.this.handlerManager.getRunningTaskHandler(), |
| runningTaskBridge, JobDriver.this.interopLogger); |
| } catch (final Exception ex) { |
| LOG.log(Level.WARNING, "Fail to invoke CLR running task handler"); |
| throw new RuntimeException(ex); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Receive notification that the Task is running when driver restarted. |
| */ |
| public final class DriverRestartRunningTaskHandler implements EventHandler<RunningTask> { |
| @Override |
| public void onNext(final RunningTask task) { |
| try (LoggingScope ls = loggingScopeFactory.driverRestartRunningTask(task.getId())) { |
| clock.scheduleAlarm(0, new EventHandler<Alarm>() { |
| @Override |
| public void onNext(final Alarm time) { |
| if (JobDriver.this.handlerManager != null) { |
| if (JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler() != 0) { |
| LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR."); |
| NativeInterop.clrSystemDriverRestartRunningTaskHandlerOnNext( |
| JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler(), |
| new RunningTaskBridge(task, activeContextBridgeFactory)); |
| } else { |
| LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, " + |
| "done with DriverRestartRunningTaskHandler."); |
| } |
| } else { |
| LOG.log(Level.INFO, "Waiting for driver to complete restart process " + |
| "before checking out CLR driver restart RunningTaskHandler..."); |
| clock.scheduleAlarm(2000, this); |
| } |
| } |
| }); |
| } |
| } |
| } |
| |
| /** |
| * Receive notification that an context is active on Evaluator when the driver restarted. |
| */ |
| public final class DriverRestartActiveContextHandler implements EventHandler<ActiveContext> { |
| @Override |
| public void onNext(final ActiveContext context) { |
| try (LoggingScope ls = loggingScopeFactory.driverRestartActiveContextReceived(context.getId())) { |
| JobDriver.this.contexts.put(context.getId(), context); |
| LOG.log(Level.INFO, "DriverRestartActiveContextHandler event received: " + context.getId()); |
| clock.scheduleAlarm(0, new EventHandler<Alarm>() { |
| @Override |
| public void onNext(final Alarm time) { |
| if (JobDriver.this.handlerManager != null) { |
| if (JobDriver.this.handlerManager.getDriverRestartActiveContextHandler() != 0) { |
| LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR."); |
| NativeInterop.clrSystemDriverRestartActiveContextHandlerOnNext( |
| JobDriver.this.handlerManager.getDriverRestartActiveContextHandler(), |
| activeContextBridgeFactory.getActiveContextBridge(context)); |
| } else { |
| LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, " + |
| "done with DriverRestartActiveContextHandler."); |
| } |
| } else { |
| LOG.log(Level.INFO, "Waiting for driver to complete restart process " + |
| "before checking out CLR driver restart DriverRestartActiveContextHandler..."); |
| clock.scheduleAlarm(2000, this); |
| } |
| } |
| }); |
| } |
| } |
| } |
| |
| /** |
| * Job Driver is ready and the clock is set up: request the evaluators. |
| */ |
| public final class StartHandler implements EventHandler<StartTime> { |
| @Override |
| public void onNext(final StartTime startTime) { |
| try (LoggingScope ls = loggingScopeFactory.driverStart(startTime)) { |
| // CLR bridge setup must be done before other event handlers try to access the CLR bridge |
| // thus we grab a lock on this instance |
| synchronized (JobDriver.this) { |
| setupBridge(); |
| LOG.log(Level.INFO, "Finished CLR bridge setup for {0}", startTime); |
| } |
| |
| NativeInterop.callClrSystemOnStartHandler(); |
| LOG.log(Level.INFO, "Driver Started"); |
| } |
| } |
| } |
| |
| |
| /** |
| * Job driver is restarted after previous crash. |
| */ |
| public final class RestartHandler implements EventHandler<DriverRestarted> { |
| @Override |
| public void onNext(final DriverRestarted driverRestarted) { |
| try (LoggingScope ls = loggingScopeFactory.driverRestart(driverRestarted.getStartTime())) { |
| // CLR bridge setup must be done before other event handlers try to access the CLR bridge |
| // thus we lock on this instance |
| synchronized (JobDriver.this) { |
| JobDriver.this.isRestarted = true; |
| setupBridge(); |
| LOG.log(Level.INFO, "Finished CLR bridge setup for {0}", driverRestarted); |
| } |
| |
| NativeInterop.callClrSystemOnRestartHandler(new DriverRestartedBridge(driverRestarted)); |
| LOG.log(Level.INFO, "Driver Restarted"); |
| } |
| } |
| } |
| |
| /** |
| * Receive notification that driver restart has completed. |
| */ |
| public final class DriverRestartCompletedHandler implements EventHandler<DriverRestartCompleted> { |
| @Override |
| public void onNext(final DriverRestartCompleted driverRestartCompleted) { |
| LOG.log(Level.INFO, "Java DriverRestartCompleted event received at time [{0}]. ", |
| driverRestartCompleted.getCompletedTime()); |
| try (LoggingScope ls = loggingScopeFactory.driverRestartCompleted( |
| driverRestartCompleted.getCompletedTime().getTimestamp())) { |
| if (JobDriver.this.handlerManager.getDriverRestartCompletedHandler() != 0) { |
| LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR."); |
| |
| NativeInterop.clrSystemDriverRestartCompletedHandlerOnNext( |
| JobDriver.this.handlerManager.getDriverRestartCompletedHandler(), |
| new DriverRestartCompletedBridge(driverRestartCompleted)); |
| } else { |
| LOG.log(Level.WARNING, "No CLR driver restart handler implemented, done with DriverRestartCompletedHandler."); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Shutting down the job driver: close the evaluators. |
| */ |
| final class StopHandler implements EventHandler<StopTime> { |
| @Override |
| public void onNext(final StopTime time) { |
| LOG.log(Level.INFO, " StopTime: {0}", new Object[]{time}); |
| try (LoggingScope ls = loggingScopeFactory.driverStop(time.getTimestamp())) { |
| for (final ActiveContext context : contexts.values()) { |
| context.close(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Handler for message received from the Task. |
| */ |
| public final class TaskMessageHandler implements EventHandler<TaskMessage> { |
| @Override |
| public void onNext(final TaskMessage taskMessage) { |
| final String msg = new String(taskMessage.get(), StandardCharsets.UTF_8); |
| LOG.log(Level.INFO, "Received TaskMessage: {0} from CLR", msg); |
| //try (LoggingScope ls = loggingScopeFactory.taskMessageReceived(new String(msg))) { |
| if (JobDriver.this.handlerManager.getTaskMessageHandler() != 0) { |
| final TaskMessageBridge taskMessageBridge = new TaskMessageBridge(taskMessage); |
| // if CLR implements the task message handler, handle the bytes in CLR handler |
| NativeInterop.clrSystemTaskMessageHandlerOnNext(JobDriver.this.handlerManager.getTaskMessageHandler(), |
| taskMessage.get(), taskMessageBridge, JobDriver.this.interopLogger); |
| } |
| //} |
| } |
| } |
| |
| /** |
| * Receive notification that the Task has been suspended. |
| */ |
| public final class SuspendedTaskHandler implements EventHandler<SuspendedTask> { |
| @Override |
| public void onNext(final SuspendedTask task) { |
| final String message = "Received notification that task [" + task.getId() + "] has been suspended."; |
| LOG.log(Level.INFO, message); |
| try (LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) { |
| if (JobDriver.this.handlerManager.getSuspendedTaskHandler() != 0) { |
| final SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task, activeContextBridgeFactory); |
| // if CLR implements the suspended task handler, handle it in CLR |
| LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge."); |
| NativeInterop.clrSystemSuspendedTaskHandlerOnNext(JobDriver.this.handlerManager.getSuspendedTaskHandler(), |
| suspendedTaskBridge); |
| } |
| JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(message)); |
| } |
| } |
| } |
| |
| /** |
| * Receive notification that the Evaluator has been shut down. |
| */ |
| public final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> { |
| @Override |
| public void onNext(final CompletedEvaluator evaluator) { |
| LOG.log(Level.INFO, " Completed Evaluator {0}", evaluator.getId()); |
| try (LoggingScope ls = loggingScopeFactory.evaluatorCompleted(evaluator.getId())) { |
| if (JobDriver.this.handlerManager.getCompletedEvaluatorHandler() != 0) { |
| final CompletedEvaluatorBridge completedEvaluatorBridge = new CompletedEvaluatorBridge(evaluator); |
| // if CLR implements the completed evaluator handler, handle it in CLR |
| LOG.log(Level.INFO, "Handling the event of completed evaluator in CLR bridge."); |
| NativeInterop.clrSystemCompletedEvaluatorHandlerOnNext( |
| JobDriver.this.handlerManager.getCompletedEvaluatorHandler(), completedEvaluatorBridge); |
| allocatedEvaluatorBridges.remove(completedEvaluatorBridge.getId()); |
| } |
| } |
| } |
| } |
| |
| |
| /** |
| * Receive notification that the Context had completed. |
| * Remove context from the list of active context. |
| */ |
| public final class ClosedContextHandler implements EventHandler<ClosedContext> { |
| @Override |
| public void onNext(final ClosedContext context) { |
| LOG.log(Level.INFO, "Completed Context: {0}", context.getId()); |
| try (LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) { |
| if (JobDriver.this.handlerManager.getClosedContextHandler() != 0) { |
| final ClosedContextBridge closedContextBridge = new ClosedContextBridge(context, activeContextBridgeFactory); |
| // if CLR implements the closed context handler, handle it in CLR |
| LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge."); |
| NativeInterop.clrSystemClosedContextHandlerOnNext(JobDriver.this.handlerManager.getClosedContextHandler(), |
| closedContextBridge); |
| } |
| JobDriver.this.contexts.remove(context.getId()); |
| } |
| } |
| } |
| |
| |
| /** |
| * Receive notification that the Context had failed. |
| * Remove context from the list of active context and notify the client. |
| */ |
| public final class FailedContextHandler implements EventHandler<FailedContext> { |
| @Override |
| public void onNext(final FailedContext context) { |
| LOG.log(Level.SEVERE, "FailedContext", context); |
| try (LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) { |
| if (JobDriver.this.handlerManager.getFailedContextHandler() != 0) { |
| final FailedContextBridge failedContextBridge = new FailedContextBridge(context, activeContextBridgeFactory); |
| // if CLR implements the failed context handler, handle it in CLR |
| LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge."); |
| NativeInterop.clrSystemFailedContextHandlerOnNext(JobDriver.this.handlerManager.getFailedContextHandler(), |
| failedContextBridge); |
| } |
| JobDriver.this.contexts.remove(context.getId()); |
| final Optional<byte[]> err = context.getData(); |
| if (err.isPresent()) { |
| JobDriver.this.jobMessageObserver.sendMessageToClient(err.get()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Receive notification that a ContextMessage has been received. |
| */ |
| public final class ContextMessageHandler implements EventHandler<ContextMessage> { |
| @Override |
| public void onNext(final ContextMessage message) { |
| LOG.log(Level.SEVERE, "Received ContextMessage:", message.get()); |
| try (LoggingScope ls = |
| loggingScopeFactory.contextMessageReceived(new String(message.get(), StandardCharsets.UTF_8))) { |
| if (JobDriver.this.handlerManager.getContextMessageHandler() != 0) { |
| final ContextMessageBridge contextMessageBridge = new ContextMessageBridge(message); |
| // if CLR implements the context message handler, handle it in CLR |
| LOG.log(Level.INFO, "Handling the event of context message in CLR bridge."); |
| NativeInterop.clrSystemContextMessageHandlerOnNext(JobDriver.this.handlerManager.getContextMessageHandler(), |
| contextMessageBridge); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Gets the progress of the application from .NET side. |
| */ |
| public final class ProgressProvider implements org.apache.reef.driver.ProgressProvider { |
| @Override |
| public float getProgress() { |
| if (JobDriver.this.handlerManager != null && JobDriver.this.handlerManager.getProgressProvider() != 0) { |
| return NativeInterop.clrSystemProgressProviderGetProgress(JobDriver.this.handlerManager.getProgressProvider()); |
| } |
| |
| return 0f; |
| } |
| } |
| } |