[REEF-1521] Decouple bridge handler setup and DriverStart event delegation
This addressed the issue by
* adding a native method solely for setting up the Java & .NET bridge handler
* removing bridge setup code from the StartHandler/RestartHandler native methods
* decoupling start/restart event trigger code from bridge handler setup
* moving start/restart event trigger code out of `synchronized` blocks
* adding a test for checking that start/restart event handlers do not block other event handlers
* removing the ClrHandlersInitializer interface and its implementations for simplicity
JIRA:
[REEF-1521](https://issues.apache.org/jira/browse/REEF-1521)
Pull Request:
This closes #1095
diff --git a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
index e7b272a..25a4c77 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
@@ -134,19 +134,14 @@
/*
* Class: org_apache_reef_javabridge_NativeInterop
* Method: callClrSystemOnStartHandler
- * Signature: (Ljava/lang/String;Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)V
+ * Signature: ()V
*/
JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler
-(JNIEnv * env, jclass jclassx, jstring dateTimeString, jstring httpServerPort, jobject jbridgeHandlerManager, jobject jevaluatorRequestorBridge) {
+(JNIEnv * env, jclass jclassx) {
try {
ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler");
DateTime dt = DateTime::Now;
-
- String^ strPort = ManagedStringFromJavaString(env, httpServerPort);
-
- EvaluatorRequestorClr2Java^ evaluatorRequestorBridge = gcnew EvaluatorRequestorClr2Java(env, jevaluatorRequestorBridge);
- BridgeHandlerManager^ handlerManager = ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart(dt, strPort, evaluatorRequestorBridge);
- populateJavaBridgeHandlerManager(env, jbridgeHandlerManager, handlerManager);
+ ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart(dt);
}
catch (System::Exception^ ex) {
// we cannot get error back to java here since we don't have an object to call back (although we ideally should...)
@@ -470,18 +465,14 @@
/*
* Class: org_apache_reef_javabridge_NativeInterop
* Method: callClrSystemOnRestartHandler
- * Signature: (Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;Lorg/apache/reef/javabridge/DriverRestartedBridge;)V
+ * Signature: (Lorg/apache/reef/javabridge/DriverRestartedBridge;)V
*/
JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler
-(JNIEnv * env, jclass jclassx, jstring httpServerPort, jobject jbridgeHandlerManager, jobject jevaluatorRequestorBridge, jobject jdriverRestartedBridge) {
+(JNIEnv * env, jclass jclassx, jobject jdriverRestartedBridge) {
try {
ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler");
- String^ strPort = ManagedStringFromJavaString(env, httpServerPort);
-
- EvaluatorRequestorClr2Java^ evaluatorRequestorBridge = gcnew EvaluatorRequestorClr2Java(env, jevaluatorRequestorBridge);
DriverRestartedClr2Java^ driverRestartedBridge = gcnew DriverRestartedClr2Java(env, jdriverRestartedBridge);
- BridgeHandlerManager^ handlerManager = ClrSystemHandlerWrapper::Call_ClrSystemRestartHandler_OnRestart(strPort, evaluatorRequestorBridge, driverRestartedBridge);
- populateJavaBridgeHandlerManager(env, jbridgeHandlerManager, handlerManager);
+ ClrSystemHandlerWrapper::Call_ClrSystemRestartHandler_OnRestart(driverRestartedBridge);
}
catch (System::Exception^ ex) {
// we cannot get error back to java here since we don't have an object to call back (although we ideally should...)
@@ -581,12 +572,32 @@
}
}
+/*
+ * Class: org_apache_reef_javabridge_NativeInterop
+ * Method: clrSystemSetupBridgeHandlerManager
+ * Signature: (Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystemSetupBridgeHandlerManager
+(JNIEnv * env, jclass cls, jstring httpServerPort, jobject jbridgeHandlerManager, jobject jevaluatorRequestorBridge) {
+ try {
+ ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_clrSystemSetupBridgeHandlerManager");
+
+ String^ strPort = ManagedStringFromJavaString(env, httpServerPort);
+ EvaluatorRequestorClr2Java^ evaluatorRequestorBridge = gcnew EvaluatorRequestorClr2Java(env, jevaluatorRequestorBridge);
+ BridgeHandlerManager^ handlerManager = ClrSystemHandlerWrapper::Call_ClrSystem_SetupBridgeHandlerManager(strPort, evaluatorRequestorBridge);
+ populateJavaBridgeHandlerManager(env, jbridgeHandlerManager, handlerManager);
+ }
+ catch (System::Exception^ ex) {
+ ManagedLog::LOGGER->LogError("Exceptions in Java_org_apache_reef_javabridge_NativeInterop_clrSystemSetupBridgeHandlerManager", ex);
+ }
+}
+
static JNINativeMethod methods[] = {
{ "loadClrAssembly", "(Ljava/lang/String;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_loadClrAssembly },
{ "clrBufferedLog", "(ILjava/lang/String;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_clrBufferedLog },
- { "callClrSystemOnStartHandler", "(Ljava/lang/String;Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)V",
+ { "callClrSystemOnStartHandler", "()V",
(void*)&Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler },
{ "clrSystemAllocatedEvaluatorHandlerOnNext", "(JLorg/apache/reef/javabridge/AllocatedEvaluatorBridge;Lorg/apache/reef/javabridge/InteropLogger;)V",
@@ -628,7 +639,7 @@
{ "clrSystemContextMessageHandlerOnNext", "(JLorg/apache/reef/javabridge/ContextMessageBridge;)V",
(void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemContextMessageHandlerOnNext },
- { "callClrSystemOnRestartHandler", "(Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;Lorg/apache/reef/javabridge/DriverRestartedBridge;)V",
+ { "callClrSystemOnRestartHandler", "(Lorg/apache/reef/javabridge/DriverRestartedBridge;)V",
(void*)&Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler },
{ "clrSystemDriverRestartActiveContextHandlerOnNext", "(JLorg/apache/reef/javabridge/ActiveContextBridge;)V",
@@ -645,6 +656,9 @@
{ "clrSystemProgressProviderGetProgress", "(J)F",
(void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemProgressProviderGetProgress },
+
+ { "clrSystemSetupBridgeHandlerManager", "(Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)V",
+ (void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemSetupBridgeHandlerManager },
};
JNIEXPORT void JNICALL
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
index 9f3497f..0895aa0 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
@@ -245,37 +245,47 @@
}
}
- public static BridgeHandlerManager Call_ClrSystemStartHandler_OnStart(
- DateTime startTime,
+ /// <summary>
+ /// Invokes event handlers registered to the driver start event.
+ /// </summary>
+ /// <param name="startTime"><see cref="DateTime"/> object that represents when this method was called.</param>
+ public static void Call_ClrSystemStartHandler_OnStart(DateTime startTime)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart"))
+ {
+ LOGGER.Log(Level.Info, "*** Start time is " + startTime);
+ _driverBridge.StartHandlersOnNext(startTime);
+ }
+ }
+
+ /// <summary>
+ /// Invokes event handlers registered to the driver restart event.
+ /// </summary>
+ /// <param name="driverRestartedClr2Java">Proxy object to the Java driver restart event object.</param>
+ public static void Call_ClrSystemRestartHandler_OnRestart(IDriverRestartedClr2Java driverRestartedClr2Java)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemRestartHandler_OnRestart"))
+ {
+ LOGGER.Log(Level.Info, "*** Restart time is " + driverRestartedClr2Java.GetStartTime());
+ _driverBridge.RestartHandlerOnNext(driverRestartedClr2Java);
+ }
+ }
+
+ /// <summary>
+ /// Configure and return a manager object holding all subscriptions given to REEF events on the .NET side.
+ /// </summary>
+ /// <param name="httpServerPort">String representation of the http port of the Java-side driver.</param>
+ /// <param name="evaluatorRequestorClr2Java">Proxy object to the Java evaluator requestor object.</param>
+ /// <returns><see cref="BridgeHandlerManager"/> object that contains .NET handles for each REEF event.</returns>
+ public static BridgeHandlerManager Call_ClrSystem_SetupBridgeHandlerManager(
string httpServerPort,
IEvaluatorRequestorClr2Java evaluatorRequestorClr2Java)
{
IEvaluatorRequestor evaluatorRequestor = new EvaluatorRequestor(evaluatorRequestorClr2Java);
- using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart"))
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystem_SetupBridgeHandlerManager"))
{
- LOGGER.Log(Level.Info, "*** Start time is " + startTime);
LOGGER.Log(Level.Info, "*** httpServerPort: " + httpServerPort);
- var handlers = GetHandlers(httpServerPort, evaluatorRequestor);
- _driverBridge.StartHandlersOnNext(startTime);
-
- return handlers;
- }
- }
-
- public static BridgeHandlerManager Call_ClrSystemRestartHandler_OnRestart(
- string httpServerPort,
- IEvaluatorRequestorClr2Java evaluatorRequestorClr2Java,
- IDriverRestartedClr2Java driverRestartedClr2Java)
- {
- IEvaluatorRequestor evaluatorRequestor = new EvaluatorRequestor(evaluatorRequestorClr2Java);
- using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemRestartHandler_OnRestart"))
- {
- LOGGER.Log(Level.Info, "*** Restart time is " + driverRestartedClr2Java.GetStartTime());
- LOGGER.Log(Level.Info, "*** httpServerPort: " + httpServerPort);
- var handlers = GetHandlers(httpServerPort, evaluatorRequestor);
- _driverBridge.RestartHandlerOnNext(driverRestartedClr2Java);
-
- return handlers;
+ return GetHandlers(httpServerPort, evaluatorRequestor);
}
}
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDriverConcurrency.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDriverConcurrency.cs
new file mode 100644
index 0000000..0c3581f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDriverConcurrency.cs
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Threading;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.Bridge
+{
+ /// <summary>
+ /// Test class containing tests related to the concurrency of the .NET driver.
+ /// </summary>
+ [Collection("FunctionalTests")]
+ public sealed class TestDriverConcurrency : ReefFunctionalTest
+ {
+ private const int EvalNum = 1;
+ private const int DriverWaitTimeoutMilliseconds = 60000;
+
+ /// <summary>
+ /// Check that event handlers for the driver start event and the evaluator allocated event can be run concurrently.
+ /// </summary>
+ [Fact]
+ public void TestStartHandlerAndEvalAllocatedHandlerOnLocalRuntime()
+ {
+ string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4);
+ TestRun(GetDriverConfiguration(),
+ typeof(DriverConcurrencyTestDriver),
+ EvalNum,
+ "TestStartHandlerAndEvalAllocatedHandlerOnLocalRuntime",
+ "local",
+ testFolder);
+ ValidateSuccessForLocalRuntime(0, testFolder: testFolder);
+ CleanUp(testFolder);
+ }
+
+ private IConfiguration GetDriverConfiguration()
+ {
+ return DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnDriverStarted, GenericType<DriverConcurrencyTestDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<DriverConcurrencyTestDriver>.Class)
+ .Build();
+ }
+
+ private sealed class DriverConcurrencyTestDriver : IObserver<IDriverStarted>, IObserver<IAllocatedEvaluator>
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(DriverConcurrencyTestDriver));
+ private readonly IEvaluatorRequestor _evaluatorRequestor;
+ private readonly CountdownEvent _countdownEvent;
+
+ [Inject]
+ private DriverConcurrencyTestDriver(IEvaluatorRequestor evaluatorRequestor)
+ {
+ _evaluatorRequestor = evaluatorRequestor;
+ _countdownEvent = new CountdownEvent(EvalNum);
+ }
+
+ public void OnNext(IDriverStarted driverStarted)
+ {
+ Logger.Log(Level.Info, "Requesting {0} evaluators.", EvalNum);
+ _evaluatorRequestor.Submit(_evaluatorRequestor.NewBuilder()
+ .SetNumber(EvalNum)
+ .Build());
+
+ // wait until the expected number of evaluator allocated events have fired
+ Assert.True(_countdownEvent.Wait(DriverWaitTimeoutMilliseconds));
+ }
+
+ public void OnNext(IAllocatedEvaluator allocatedEvaluator)
+ {
+ Logger.Log(Level.Info, "Trigger {0} and close {1}", _countdownEvent, allocatedEvaluator.Id);
+ _countdownEvent.Signal();
+ allocatedEvaluator.Dispose();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index d043ab1..4774eb0 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -78,6 +78,7 @@
<Compile Include="Functional\Bridge\TestBridgeClient.cs" />
<Compile Include="Functional\Bridge\TestCloseTask.cs" />
<Compile Include="Functional\Bridge\TestContextStack.cs" />
+ <Compile Include="Functional\Bridge\TestDriverConcurrency.cs" />
<Compile Include="Functional\Bridge\TestFailedEvaluatorEventHandler.cs" />
<Compile Include="Functional\Common\Task\ExceptionTask.cs" />
<Compile Include="Functional\Common\Task\NullTask.cs" />
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
index 6de53ef..83f783f 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
@@ -35,11 +35,7 @@
public static native void clrBufferedLog(final int level, final String message);
- public static native void callClrSystemOnStartHandler(
- final String dateTime,
- final String httpServerPortNumber,
- final BridgeHandlerManager bridgeHandlerManager,
- final EvaluatorRequestorBridge javaEvaluatorRequestorBridge);
+ public static native void callClrSystemOnStartHandler();
public static native void clrSystemAllocatedEvaluatorHandlerOnNext(
final long handle,
@@ -116,9 +112,6 @@
);
public static native void callClrSystemOnRestartHandler(
- final String httpServerPortNumber,
- final BridgeHandlerManager bridgeHandlerManager,
- final EvaluatorRequestorBridge javaEvaluatorRequestorBridge,
final DriverRestartedBridge driverRestartedBridge
);
@@ -145,6 +138,12 @@
public static native float clrSystemProgressProviderGetProgress(final long handle);
+ public static native void clrSystemSetupBridgeHandlerManager(
+ final String httpServerPortNumber,
+ final BridgeHandlerManager bridgeHandlerManager,
+ final EvaluatorRequestorBridge javaEvaluatorRequestorBridge
+ );
+
/**
* Empty private constructor to prohibit instantiation of utility class.
*/
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java
deleted file mode 100644
index a151fd1..0000000
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.javabridge.generic;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.javabridge.BridgeHandlerManager;
-import org.apache.reef.javabridge.EvaluatorRequestorBridge;
-
-/**
- * An initializer interface that initializes ClrHandlers for the CLR {@link JobDriver}.
- */
-@DriverSide
-@Private
-@Unstable
-interface ClrHandlersInitializer {
-
- /**
- * Returns the set of CLR handles.
- */
- BridgeHandlerManager getClrHandlers(final String portNumber, final EvaluatorRequestorBridge evaluatorRequestorBridge);
-}
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
deleted file mode 100644
index 1da5347..0000000
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.javabridge.generic;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.restart.DriverRestarted;
-import org.apache.reef.javabridge.BridgeHandlerManager;
-import org.apache.reef.javabridge.DriverRestartedBridge;
-import org.apache.reef.javabridge.EvaluatorRequestorBridge;
-import org.apache.reef.javabridge.NativeInterop;
-
-/**
- * An initializer implementation that initializes ClrHandlers for the CLR {@link JobDriver} for the case
- * where the Driver has restarted.
- */
-@Private
-@DriverSide
-@Unstable
-final class DriverRestartClrHandlersInitializer implements ClrHandlersInitializer {
- private final DriverRestarted driverRestarted;
-
- DriverRestartClrHandlersInitializer(final DriverRestarted driverRestarted) {
- this.driverRestarted = driverRestarted;
- }
-
- @Override
- public BridgeHandlerManager getClrHandlers(final String portNumber,
- final EvaluatorRequestorBridge evaluatorRequestorBridge) {
- final BridgeHandlerManager bridgeHandlerManager = new BridgeHandlerManager();
- NativeInterop.callClrSystemOnRestartHandler(portNumber, bridgeHandlerManager, evaluatorRequestorBridge,
- new DriverRestartedBridge(driverRestarted));
-
- return bridgeHandlerManager;
- }
-}
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java
deleted file mode 100644
index 765869c..0000000
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.javabridge.generic;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.javabridge.BridgeHandlerManager;
-import org.apache.reef.javabridge.EvaluatorRequestorBridge;
-import org.apache.reef.javabridge.NativeInterop;
-import org.apache.reef.wake.time.event.StartTime;
-
-/**
- * An initializer implementation that initializes ClrHandlers for the CLR {@link JobDriver} for the case
- * of regular Driver startup.
- */
-@Private
-@DriverSide
-@Unstable
-final class DriverStartClrHandlersInitializer implements ClrHandlersInitializer {
- private final StartTime startTime;
-
- DriverStartClrHandlersInitializer(final StartTime startTime) {
- this.startTime = startTime;
- }
-
- @Override
- public BridgeHandlerManager getClrHandlers(final String portNumber,
- final EvaluatorRequestorBridge evaluatorRequestorBridge) {
- BridgeHandlerManager bridgeHandlerManager = new BridgeHandlerManager();
- NativeInterop.callClrSystemOnStartHandler(startTime.toString(), portNumber, bridgeHandlerManager,
- evaluatorRequestorBridge);
-
- return bridgeHandlerManager;
- }
-}
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
index b0c8d41..8a0fb86 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
@@ -167,7 +167,7 @@
this.definedRuntimes = definedRuntimes;
}
- private void setupBridge(final ClrHandlersInitializer initializer) {
+ private void setupBridge() {
// Signal to the clr buffered log handler that the driver has started and that
// we can begin logging
LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler...");
@@ -196,7 +196,9 @@
this.evaluatorRequestorBridge =
new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory,
JobDriver.this.definedRuntimes);
- JobDriver.this.handlerManager = initializer.getClrHandlers(portNumber, evaluatorRequestorBridge);
+ JobDriver.this.handlerManager = new BridgeHandlerManager();
+ NativeInterop.clrSystemSetupBridgeHandlerManager(portNumber,
+ JobDriver.this.handlerManager, evaluatorRequestorBridge);
try (final LoggingScope lp =
this.loggingScopeFactory.getNewLoggingScope("setupBridge::clrSystemHttpServerHandlerOnNext")) {
@@ -581,11 +583,15 @@
@Override
public void onNext(final StartTime startTime) {
try (final LoggingScope ls = loggingScopeFactory.driverStart(startTime)) {
+ // CLR bridge setup must be done before other event handlers try to access the CLR bridge
+ // thus we grab a lock on this instance
synchronized (JobDriver.this) {
-
- setupBridge(new DriverStartClrHandlersInitializer(startTime));
- LOG.log(Level.INFO, "Driver Started");
+ setupBridge();
+ LOG.log(Level.INFO, "Finished CLR bridge setup for {0}", startTime);
}
+
+ NativeInterop.callClrSystemOnStartHandler();
+ LOG.log(Level.INFO, "Driver Started");
}
}
}
@@ -598,13 +604,16 @@
@Override
public void onNext(final DriverRestarted driverRestarted) {
try (final LoggingScope ls = loggingScopeFactory.driverRestart(driverRestarted.getStartTime())) {
+ // CLR bridge setup must be done before other event handlers try to access the CLR bridge
+ // thus we lock on this instance
synchronized (JobDriver.this) {
-
JobDriver.this.isRestarted = true;
- setupBridge(new DriverRestartClrHandlersInitializer(driverRestarted));
-
- LOG.log(Level.INFO, "Driver Restarted and CLR bridge set up.");
+ setupBridge();
+ LOG.log(Level.INFO, "Finished CLR bridge setup for {0}", driverRestarted);
}
+
+ NativeInterop.callClrSystemOnRestartHandler(new DriverRestartedBridge(driverRestarted));
+ LOG.log(Level.INFO, "Driver Restarted");
}
}
}