[REEF-1939] Implement network transport for the Java-CLR bridge in C#
Summary of changes:
* Implement `NetworkTransport` wrapper and `LocalObserver` handler for incoming messages
* Add Java bridge endpoint file name to `REEFFileNames` constants
* Refactor the Avro protocol for the bridge
* Minor refactoring and code cleanup
* The bulk of the classes in REEF-1939, 1938, and 1936
were pulled from an uncommited pull request for REEF-1763
implemented by Doug Service.
JIRA: [REEF-1939](https://issues.apache.org/jira/browse/REEF-1939)
Pull Request:
Closes #1406
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/LocalObserver.cs b/lang/cs/Org.Apache.REEF.Bridge.CLR/LocalObserver.cs
new file mode 100644
index 0000000..d76e937
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Bridge.CLR/LocalObserver.cs
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Avro;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Bridge
+{
+ /// <summary>
+ /// The Local Observer class receives byte buffer messages from the transport layer,
+ /// deserializes the messages into Avro C# classes, and invokes the appropriate
+ /// IObserver callback on the Avro message observer.
+ /// </summary>
+ public sealed class LocalObserver : IObserver<IRemoteMessage<byte[]>>
+ {
+ /// <summary>
+ /// Specify the class that will be called to process deserialied Avro messages.
+ /// </summary>
+ [NamedParameter(Documentation =
+ "Must implement IObserver<IMessageInstance<TMessage>> for messages to be received.")]
+ public class MessageObserver : Name<object>
+ {
+ }
+
+ private static readonly Logger Logger = Logger.GetLogger(typeof(LocalObserver));
+
+ private readonly ProtocolSerializer _serializer;
+ private readonly IInjectionFuture<object> _fMessageObserver;
+
+ /// <param name="serializer">The protocol serializer instance to be used.</param>
+ /// <param name="fMessageObserver">An injection future with message observer to be
+ /// called to process Avro messages from the Java bridge.</param>
+ [Inject]
+ private LocalObserver(
+ ProtocolSerializer serializer,
+ [Parameter(typeof(MessageObserver))] IInjectionFuture<object> fMessageObserver)
+ {
+ _serializer = serializer;
+ _fMessageObserver = fMessageObserver;
+ }
+
+ /// <summary>
+ /// Called by the remote manager to process messages received from the java bridge.
+ /// </summary>
+ /// <param name="message">A byte buffer containing a serialzied message.</param>
+ public void OnNext(IRemoteMessage<byte[]> message)
+ {
+ Logger.Log(Level.Verbose, "Message received: {0}", message.Identifier);
+
+ // Deserialize the message and invoke the appropriate handler.
+ _serializer.Read(message.Message, _fMessageObserver.Get());
+ }
+
+ /// <summary>
+ /// Handles error conditions in the low transport layer.
+ /// </summary>
+ /// <param name="error">The exception generated in the transport layer.</param>
+ public void OnError(Exception error)
+ {
+ Logger.Log(Level.Error, "Error in the transport layer", error);
+ }
+
+ /// <summary>
+ /// Notification that no more message processing is required.
+ /// </summary>
+ public void OnCompleted()
+ {
+ Logger.Log(Level.Info, "Completed");
+ }
+ }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/BridgeInterop.cs b/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/BridgeInterop.cs
deleted file mode 100644
index 2e862a7..0000000
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/BridgeInterop.cs
+++ /dev/null
@@ -1,83 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//<auto-generated />
-namespace org.apache.reef.bridge.message
-{
- using System;
- using System.Collections.Generic;
- using System.Runtime.Serialization;
- using Microsoft.Hadoop.Avro;
-
- /// <summary>
- /// Used to serialize and deserialize Avro record org.apache.reef.bridge.message.BridgeInterop.
- /// </summary>
- [DataContract(Namespace = "org.apache.reef.bridge.message")]
- public partial class BridgeInterop
- {
- private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.bridge.message.BridgeInterop"",""doc"":""Container message for all Java/CLR bridge messages in the protocol."",""fields"":[{""name"":""sequence"",""doc"":""The unique squence identifier of the message in the protocol stream."",""type"":""long""},{""name"":""messageType"",""doc"":""The type of the bridge Java/CLR interop message."",""type"":{""type"":""enum"",""name"":""org.apache.reef.bridge.message.MessageType"",""doc"":""An enumeration of all possible Java/C# bridge protocol messages."",""symbols"":[""SetupBridge"",""SystemOnStart"",""Acknowledgement""]}},{""name"":""message"",""doc"":""A union which contains the actual message."",""type"":[{""type"":""record"",""name"":""org.apache.reef.bridge.message.SetupBridge"",""doc"":""Notify the C# bridge of the http port of the Java bridge webserver."",""fields"":[{""name"":""httpServerPortNumber"",""doc"":""The Java bridge http server port number."",""type"":""int""}]},{""type"":""record"",""name"":""org.apache.reef.bridge.message.SystemOnStart"",""doc"":""Notify the C# bridge the system is now running."",""fields"":[{""name"":""dateTime"",""doc"":""Date time in seconds as a long since January 1, 1970"",""type"":""long""}]},{""type"":""record"",""name"":""org.apache.reef.bridge.message.Acknowledgement"",""doc"":""The Acknowledgement message is sent to the Java bridge to acknowledge receipt and processing of a specific message."",""fields"":[{""name"":""messageIdentifier"",""doc"":""The message identifier of the message that was successfully processed."",""type"":""long""}]},{""type"":""array"",""items"":""bytes""}]}]}";
-
- /// <summary>
- /// Gets the schema.
- /// </summary>
- public static string Schema
- {
- get
- {
- return JsonSchema;
- }
- }
-
- /// <summary>
- /// Gets or sets the sequence field.
- /// </summary>
- [DataMember]
- public long sequence { get; set; }
-
- /// <summary>
- /// Gets or sets the messageType field.
- /// </summary>
- [DataMember]
- public org.apache.reef.bridge.message.MessageType messageType { get; set; }
-
- /// <summary>
- /// Gets or sets the message field.
- /// </summary>
- [DataMember]
- [AvroUnion(typeof(org.apache.reef.bridge.message.SetupBridge), typeof(org.apache.reef.bridge.message.SystemOnStart), typeof(org.apache.reef.bridge.message.Acknowledgement), typeof(List<byte[]>))]
- public object message { get; set; }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="BridgeInterop"/> class.
- /// </summary>
- public BridgeInterop()
- {
- }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="BridgeInterop"/> class.
- /// </summary>
- /// <param name="sequence">The sequence.</param>
- /// <param name="messageType">The messageType.</param>
- /// <param name="message">The message.</param>
- public BridgeInterop(long sequence, org.apache.reef.bridge.message.MessageType messageType, object message)
- {
- this.sequence = sequence;
- this.messageType = messageType;
- this.message = message;
- }
- }
-}
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Protocol.cs b/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/BridgeProtocol.cs
similarity index 77%
rename from lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Protocol.cs
rename to lang/cs/Org.Apache.REEF.Bridge.CLR/Message/BridgeProtocol.cs
index 2f88621..deab9df 100644
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Protocol.cs
+++ b/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/BridgeProtocol.cs
@@ -23,12 +23,12 @@
using Microsoft.Hadoop.Avro;
/// <summary>
- /// Used to serialize and deserialize Avro record org.apache.reef.bridge.message.Protocol.
+ /// Used to serialize and deserialize Avro record org.apache.reef.bridge.message.BridgeProtocol.
/// </summary>
[DataContract(Namespace = "org.apache.reef.bridge.message")]
- public partial class Protocol
+ public partial class BridgeProtocol
{
- private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.bridge.message.Protocol"",""doc"":""Negotiate Java/C# bridge protocol messages."",""fields"":[{""name"":""offset"",""doc"":""The index offset of the message identifiers."",""type"":""int""}]}";
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.bridge.message.BridgeProtocol"",""doc"":""Negotiate Java/C# bridge protocol messages."",""fields"":[{""name"":""offset"",""doc"":""The index offset of the message identifiers."",""type"":""int""}]}";
/// <summary>
/// Gets the schema.
@@ -48,17 +48,17 @@
public int offset { get; set; }
/// <summary>
- /// Initializes a new instance of the <see cref="Protocol"/> class.
+ /// Initializes a new instance of the <see cref="BridgeProtocol"/> class.
/// </summary>
- public Protocol()
+ public BridgeProtocol()
{
}
/// <summary>
- /// Initializes a new instance of the <see cref="Protocol"/> class.
+ /// Initializes a new instance of the <see cref="BridgeProtocol"/> class.
/// </summary>
/// <param name="offset">The offset.</param>
- public Protocol(int offset)
+ public BridgeProtocol(int offset)
{
this.offset = offset;
}
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/MessageType.cs b/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/MessageType.cs
deleted file mode 100644
index be59777..0000000
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/MessageType.cs
+++ /dev/null
@@ -1,47 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//<auto-generated />
-namespace org.apache.reef.bridge.message
-{
- using System.Runtime.Serialization;
-
- /// <summary>
- /// Used to serialize and deserialize Avro enum org.apache.reef.bridge.message.MessageType.
- /// </summary>
- [DataContract(Namespace = "org.apache.reef.bridge.message")]
- public enum MessageType
- {
- /// <summary>
- /// The SetupBridge symbol.
- /// </summary>
- [EnumMember]
- SetupBridge,
-
- /// <summary>
- /// The SystemOnStart symbol.
- /// </summary>
- [EnumMember]
- SystemOnStart,
-
- /// <summary>
- /// The Acknowledgement symbol.
- /// </summary>
- [EnumMember]
- Acknowledgement,
-
- }
-}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/NetworkTransport.cs b/lang/cs/Org.Apache.REEF.Bridge.CLR/NetworkTransport.cs
new file mode 100644
index 0000000..ed23a90
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Bridge.CLR/NetworkTransport.cs
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Concurrent;
+using System.IO;
+using System.Net;
+using org.apache.reef.bridge.message;
+using Org.Apache.REEF.Common.Files;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Avro;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Bridge
+{
+ /// <summary>
+ /// The CLR Bridge Network class aggregates a RemoteManager and
+ /// Protocol Serializer to provide a simple send/receive interface
+ /// between the CLR and Java bridges.
+ /// </summary>
+ public sealed class NetworkTransport
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(NetworkTransport));
+
+ private readonly BlockingCollection<byte[]> _queue = new BlockingCollection<byte[]>();
+
+ private readonly ProtocolSerializer _serializer;
+ private readonly IRemoteManager<byte[]> _remoteManager;
+ private readonly IObserver<byte[]> _remoteObserver;
+ private readonly REEFFileNames _fileNames;
+
+ /// <summary>
+ /// Construct a network stack using the wake remote manager.
+ /// </summary>
+ /// <param name="localAddressProvider">An address provider used to obtain
+ /// a local IP address on an open port.</param>
+ /// <param name="serializer">Serializer/deserializer of the bridge messages.</param>
+ /// <param name="localObserver">Handler of the incoming bridge messages.</param>
+ /// <param name="remoteManagerFactory">RemoteManager factory.
+ /// We need a new instance of the RM to communicate with the Java side of the bridge.</param>
+ /// <param name="filenames">Collection of global constants for file paths and such.</param>
+ [Inject]
+ private NetworkTransport(
+ ILocalAddressProvider localAddressProvider,
+ ProtocolSerializer serializer,
+ LocalObserver localObserver,
+ IRemoteManagerFactory remoteManagerFactory,
+ REEFFileNames fileNames)
+ {
+ _serializer = serializer;
+ _fileNames = fileNames;
+
+ // Instantiate the remote manager.
+ _remoteManager = remoteManagerFactory.GetInstance(localAddressProvider.LocalAddress, new ByteCodec());
+
+ // Listen to the java bridge on the local end point.
+ _remoteManager.RegisterObserver(localObserver);
+ Logger.Log(Level.Info, "Local observer listening to java bridge on: [{0}]", _remoteManager.LocalEndpoint);
+
+ // Instantiate a remote observer to send messages to the java bridge.
+ IPEndPoint javaIpEndPoint = GetJavaBridgeEndpoint();
+ Logger.Log(Level.Info, "Connecting to java bridge on: [{0}]", javaIpEndPoint);
+ _remoteObserver = _remoteManager.GetRemoteObserver(javaIpEndPoint);
+
+ // Negotiate the protocol.
+ Send(0, new BridgeProtocol(100));
+ }
+
+ /// <summary>
+ /// Send a message to the java side of the bridge.
+ /// </summary>
+ /// <param name="identifier">A long value that which is the unique sequence identifier of the message.</param>
+ /// <param name="message">An object reference to a message in the org.apache.reef.bridge.message package.</param>
+ public void Send(long identifier, object message)
+ {
+ Logger.Log(Level.Verbose, "Sending message: {0}", message);
+ _remoteObserver.OnNext(_serializer.Write(message, identifier));
+ }
+
+ /// <summary>
+ /// Retrieves the address of the java bridge.
+ /// </summary>
+ /// <returns>IP address and port of the Java bridge.</returns>
+ private IPEndPoint GetJavaBridgeEndpoint()
+ {
+ using (FileStream stream = File.Open(_fileNames.DriverJavaBridgeEndpointFileName, FileMode.Open))
+ {
+ using (StreamReader reader = new StreamReader(stream))
+ {
+ string javaBridgeAddress = reader.ReadToEnd();
+ Logger.Log(Level.Info, "Java bridge address: {0}", javaBridgeAddress);
+
+ string[] javaAddressStrs = javaBridgeAddress.Split(':');
+ IPAddress javaBridgeIpAddress = IPAddress.Parse(javaAddressStrs[0]);
+ int port = int.Parse(javaAddressStrs[1]);
+ return new IPEndPoint(javaBridgeIpAddress, port);
+ }
+ }
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj b/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
index dbbe571..20500c3 100644
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
+++ b/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
@@ -53,12 +53,12 @@
<Compile Include="$(SolutionDir)\SharedAssemblyInfo.cs">
<Link>Properties\SharedAssemblyInfo.cs</Link>
</Compile>
+ <Compile Include="LocalObserver.cs" />
<Compile Include="Message\Acknowledgement.cs" />
- <Compile Include="Message\BridgeInterop.cs" />
- <Compile Include="Message\MessageType.cs" />
- <Compile Include="Message\Protocol.cs" />
+ <Compile Include="Message\BridgeProtocol.cs" />
<Compile Include="Message\SetupBridge.cs" />
<Compile Include="Message\SystemOnStart.cs" />
+ <Compile Include="NetworkTransport.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Message\README.md" />
@@ -92,4 +92,4 @@
<Import Project="$(SolutionDir)\AvroCodeGeneration.Targets" Condition="Exists('$(SolutionDir)\AvroCodeGeneration.Targets')" />
<Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
<Import Project="$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets" Condition="Exists('$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets')" />
-</Project>
\ No newline at end of file
+</Project>
diff --git a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
index 5cb272e..4c21d1e 100644
--- a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
@@ -50,6 +50,7 @@
private const string CLR_DRIVER_CONFIGURATION_NAME = "clrdriver.conf";
private const string CLR_BRIDGE_CONFIGURATION_NAME = "clrBridge.config";
private const string DRIVER_HTTP_ENDPOINT_FILE_NAME = "DriverHttpEndpoint.txt";
+ private const string DRIVER_JAVA_BRIDGE_ENDPOINT_FILE_NAME = "DriverJavaBridgeEndpoint.txt";
private const string BRIDGE_EXE_NAME = "Org.Apache.REEF.Bridge.exe";
private const string BRIDGE_EXE_CONFIG_NAME = "Org.Apache.REEF.Bridge.exe.config";
private const string SECURITY_TOKEN_IDENTIFIER_FILE = "SecurityTokenId";
@@ -338,12 +339,19 @@
}
/// <summary>
+ /// Name of the file that contains the driver name server address and port.
/// </summary>
+ /// <returns>File name that contains the dfs path for the DriverNameServerEndpoint</returns>
+ public string DriverJavaBridgeEndpointFileName
+ {
+ get { return DRIVER_JAVA_BRIDGE_ENDPOINT_FILE_NAME; }
+ }
+
/// <returns>File name that contains the dfs path for the DriverHttpEndpoint</returns>
[Unstable("0.13", "Working in progress for what to return after submit")]
- public string DriverHttpEndpoint
- {
- get { return DRIVER_HTTP_ENDPOINT_FILE_NAME; }
+ public string DriverHttpEndpoint
+ {
+ get { return DRIVER_HTTP_ENDPOINT_FILE_NAME; }
}
private static readonly string GLOBAL_FOLDER_PATH = Path.Combine(REEF_BASE_FOLDER, GLOBAL_FOLDER);
@@ -358,4 +366,4 @@
private static readonly string EVALUATOR_CONFIGURATION_PATH =
Path.Combine(LOCAL_FOLDER_PATH, EVALUATOR_CONFIGURATION_NAME);
}
-}
\ No newline at end of file
+}