[REEF-1939] Implement network transport for the Java-CLR bridge in C#

Summary of changes:
   * Implement `NetworkTransport` wrapper and `LocalObserver` handler for incoming messages
   * Add Java bridge endpoint file name to `REEFFileNames` constants
   * Refactor the Avro protocol for the bridge
   * Minor refactoring and code cleanup
   * The bulk of the classes in REEF-1939, 1938, and 1936
     were pulled from an uncommited pull request for REEF-1763
     implemented by Doug Service.

JIRA: [REEF-1939](https://issues.apache.org/jira/browse/REEF-1939)

Pull Request:
  Closes #1406
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/LocalObserver.cs b/lang/cs/Org.Apache.REEF.Bridge.CLR/LocalObserver.cs
new file mode 100644
index 0000000..d76e937
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Bridge.CLR/LocalObserver.cs
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Avro;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Bridge
+{
+    /// <summary>
+    /// The Local Observer class receives byte buffer messages from the transport layer,
+    /// deserializes the messages into Avro C# classes, and invokes the appropriate
+    /// IObserver callback on the Avro message observer.
+    /// </summary>
+    public sealed class LocalObserver : IObserver<IRemoteMessage<byte[]>>
+    {
+        /// <summary>
+        /// Specify the class that will be called to process deserialied Avro messages.
+        /// </summary>
+        [NamedParameter(Documentation =
+            "Must implement IObserver<IMessageInstance<TMessage>> for messages to be received.")]
+        public class MessageObserver : Name<object>
+        {
+        }
+
+        private static readonly Logger Logger = Logger.GetLogger(typeof(LocalObserver));
+
+        private readonly ProtocolSerializer _serializer;
+        private readonly IInjectionFuture<object> _fMessageObserver;
+
+        /// <param name="serializer">The protocol serializer instance to be used.</param>
+        /// <param name="fMessageObserver">An injection future with message observer to be
+        /// called to process Avro messages from the Java bridge.</param>
+        [Inject]
+        private LocalObserver(
+            ProtocolSerializer serializer,
+            [Parameter(typeof(MessageObserver))] IInjectionFuture<object> fMessageObserver)
+        {
+            _serializer = serializer;
+            _fMessageObserver = fMessageObserver;
+        }
+
+        /// <summary>
+        /// Called by the remote manager to process messages received from the java bridge.
+        /// </summary>
+        /// <param name="message">A byte buffer containing a serialzied message.</param>
+        public void OnNext(IRemoteMessage<byte[]> message)
+        {
+            Logger.Log(Level.Verbose, "Message received: {0}", message.Identifier);
+
+            // Deserialize the message and invoke the appropriate handler.
+            _serializer.Read(message.Message, _fMessageObserver.Get());
+        }
+
+        /// <summary>
+        /// Handles error conditions in the low transport layer.
+        /// </summary>
+        /// <param name="error">The exception generated in the transport layer.</param>
+        public void OnError(Exception error)
+        {
+            Logger.Log(Level.Error, "Error in the transport layer", error);
+        }
+
+        /// <summary>
+        /// Notification that no more message processing is required.
+        /// </summary>
+        public void OnCompleted()
+        {
+            Logger.Log(Level.Info, "Completed");
+        }
+    }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/BridgeInterop.cs b/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/BridgeInterop.cs
deleted file mode 100644
index 2e862a7..0000000
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/BridgeInterop.cs
+++ /dev/null
@@ -1,83 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//<auto-generated />
-namespace org.apache.reef.bridge.message
-{
-    using System;
-    using System.Collections.Generic;
-    using System.Runtime.Serialization;
-    using Microsoft.Hadoop.Avro;
-
-    /// <summary>
-    /// Used to serialize and deserialize Avro record org.apache.reef.bridge.message.BridgeInterop.
-    /// </summary>
-    [DataContract(Namespace = "org.apache.reef.bridge.message")]
-    public partial class BridgeInterop
-    {
-        private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.bridge.message.BridgeInterop"",""doc"":""Container message for all Java/CLR bridge messages in the protocol."",""fields"":[{""name"":""sequence"",""doc"":""The unique squence identifier of the message in the protocol stream."",""type"":""long""},{""name"":""messageType"",""doc"":""The type of the bridge Java/CLR interop message."",""type"":{""type"":""enum"",""name"":""org.apache.reef.bridge.message.MessageType"",""doc"":""An enumeration of all possible Java/C# bridge protocol messages."",""symbols"":[""SetupBridge"",""SystemOnStart"",""Acknowledgement""]}},{""name"":""message"",""doc"":""A union which contains the actual message."",""type"":[{""type"":""record"",""name"":""org.apache.reef.bridge.message.SetupBridge"",""doc"":""Notify the C# bridge of the http port of the Java bridge webserver."",""fields"":[{""name"":""httpServerPortNumber"",""doc"":""The Java bridge http server port number."",""type"":""int""}]},{""type"":""record"",""name"":""org.apache.reef.bridge.message.SystemOnStart"",""doc"":""Notify the C# bridge the system is now running."",""fields"":[{""name"":""dateTime"",""doc"":""Date time in seconds as a long since January 1, 1970"",""type"":""long""}]},{""type"":""record"",""name"":""org.apache.reef.bridge.message.Acknowledgement"",""doc"":""The Acknowledgement message is sent to the Java bridge to acknowledge receipt and processing of a specific message."",""fields"":[{""name"":""messageIdentifier"",""doc"":""The message identifier of the message that was successfully processed."",""type"":""long""}]},{""type"":""array"",""items"":""bytes""}]}]}";
-
-        /// <summary>
-        /// Gets the schema.
-        /// </summary>
-        public static string Schema
-        {
-            get
-            {
-                return JsonSchema;
-            }
-        }
-      
-        /// <summary>
-        /// Gets or sets the sequence field.
-        /// </summary>
-        [DataMember]
-        public long sequence { get; set; }
-              
-        /// <summary>
-        /// Gets or sets the messageType field.
-        /// </summary>
-        [DataMember]
-        public org.apache.reef.bridge.message.MessageType messageType { get; set; }
-              
-        /// <summary>
-        /// Gets or sets the message field.
-        /// </summary>
-        [DataMember]
-        [AvroUnion(typeof(org.apache.reef.bridge.message.SetupBridge), typeof(org.apache.reef.bridge.message.SystemOnStart), typeof(org.apache.reef.bridge.message.Acknowledgement), typeof(List<byte[]>))]
-        public object message { get; set; }
-                
-        /// <summary>
-        /// Initializes a new instance of the <see cref="BridgeInterop"/> class.
-        /// </summary>
-        public BridgeInterop()
-        {
-        }
-
-        /// <summary>
-        /// Initializes a new instance of the <see cref="BridgeInterop"/> class.
-        /// </summary>
-        /// <param name="sequence">The sequence.</param>
-        /// <param name="messageType">The messageType.</param>
-        /// <param name="message">The message.</param>
-        public BridgeInterop(long sequence, org.apache.reef.bridge.message.MessageType messageType, object message)
-        {
-            this.sequence = sequence;
-            this.messageType = messageType;
-            this.message = message;
-        }
-    }
-}
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Protocol.cs b/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/BridgeProtocol.cs
similarity index 77%
rename from lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Protocol.cs
rename to lang/cs/Org.Apache.REEF.Bridge.CLR/Message/BridgeProtocol.cs
index 2f88621..deab9df 100644
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Protocol.cs
+++ b/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/BridgeProtocol.cs
@@ -23,12 +23,12 @@
     using Microsoft.Hadoop.Avro;
 
     /// <summary>
-    /// Used to serialize and deserialize Avro record org.apache.reef.bridge.message.Protocol.
+    /// Used to serialize and deserialize Avro record org.apache.reef.bridge.message.BridgeProtocol.
     /// </summary>
     [DataContract(Namespace = "org.apache.reef.bridge.message")]
-    public partial class Protocol
+    public partial class BridgeProtocol
     {
-        private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.bridge.message.Protocol"",""doc"":""Negotiate Java/C# bridge protocol messages."",""fields"":[{""name"":""offset"",""doc"":""The index offset of the message identifiers."",""type"":""int""}]}";
+        private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.bridge.message.BridgeProtocol"",""doc"":""Negotiate Java/C# bridge protocol messages."",""fields"":[{""name"":""offset"",""doc"":""The index offset of the message identifiers."",""type"":""int""}]}";
 
         /// <summary>
         /// Gets the schema.
@@ -48,17 +48,17 @@
         public int offset { get; set; }
                 
         /// <summary>
-        /// Initializes a new instance of the <see cref="Protocol"/> class.
+        /// Initializes a new instance of the <see cref="BridgeProtocol"/> class.
         /// </summary>
-        public Protocol()
+        public BridgeProtocol()
         {
         }
 
         /// <summary>
-        /// Initializes a new instance of the <see cref="Protocol"/> class.
+        /// Initializes a new instance of the <see cref="BridgeProtocol"/> class.
         /// </summary>
         /// <param name="offset">The offset.</param>
-        public Protocol(int offset)
+        public BridgeProtocol(int offset)
         {
             this.offset = offset;
         }
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/MessageType.cs b/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/MessageType.cs
deleted file mode 100644
index be59777..0000000
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/MessageType.cs
+++ /dev/null
@@ -1,47 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//<auto-generated />
-namespace org.apache.reef.bridge.message
-{
-    using System.Runtime.Serialization;
-
-    /// <summary>
-    /// Used to serialize and deserialize Avro enum org.apache.reef.bridge.message.MessageType.
-    /// </summary>
-    [DataContract(Namespace = "org.apache.reef.bridge.message")]
-    public enum MessageType
-    {
-        /// <summary>
-        /// The SetupBridge symbol.
-        /// </summary>
-        [EnumMember]
-        SetupBridge,
-
-        /// <summary>
-        /// The SystemOnStart symbol.
-        /// </summary>
-        [EnumMember]
-        SystemOnStart,
-
-        /// <summary>
-        /// The Acknowledgement symbol.
-        /// </summary>
-        [EnumMember]
-        Acknowledgement,
-
-    }
-}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/NetworkTransport.cs b/lang/cs/Org.Apache.REEF.Bridge.CLR/NetworkTransport.cs
new file mode 100644
index 0000000..ed23a90
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Bridge.CLR/NetworkTransport.cs
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Concurrent;
+using System.IO;
+using System.Net;
+using org.apache.reef.bridge.message;
+using Org.Apache.REEF.Common.Files;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Avro;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Bridge
+{
+    /// <summary>
+    /// The CLR Bridge Network class aggregates a RemoteManager and
+    /// Protocol Serializer to provide a simple send/receive interface
+    /// between the CLR and Java bridges.
+    /// </summary>
+    public sealed class NetworkTransport
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(NetworkTransport));
+
+        private readonly BlockingCollection<byte[]> _queue = new BlockingCollection<byte[]>();
+
+        private readonly ProtocolSerializer _serializer;
+        private readonly IRemoteManager<byte[]> _remoteManager;
+        private readonly IObserver<byte[]> _remoteObserver;
+        private readonly REEFFileNames _fileNames;
+
+        /// <summary>
+        /// Construct a network stack using the wake remote manager.
+        /// </summary>
+        /// <param name="localAddressProvider">An address provider used to obtain
+        /// a local IP address on an open port.</param>
+        /// <param name="serializer">Serializer/deserializer of the bridge messages.</param>
+        /// <param name="localObserver">Handler of the incoming bridge messages.</param>
+        /// <param name="remoteManagerFactory">RemoteManager factory.
+        /// We need a new instance of the RM to communicate with the Java side of the bridge.</param>
+        /// <param name="filenames">Collection of global constants for file paths and such.</param>
+        [Inject]
+        private NetworkTransport(
+            ILocalAddressProvider localAddressProvider,
+            ProtocolSerializer serializer,
+            LocalObserver localObserver,
+            IRemoteManagerFactory remoteManagerFactory,
+            REEFFileNames fileNames)
+        {
+            _serializer = serializer;
+            _fileNames = fileNames;
+
+            // Instantiate the remote manager.
+            _remoteManager = remoteManagerFactory.GetInstance(localAddressProvider.LocalAddress, new ByteCodec());
+
+            // Listen to the java bridge on the local end point.
+            _remoteManager.RegisterObserver(localObserver);
+            Logger.Log(Level.Info, "Local observer listening to java bridge on: [{0}]", _remoteManager.LocalEndpoint);
+
+            // Instantiate a remote observer to send messages to the java bridge.
+            IPEndPoint javaIpEndPoint = GetJavaBridgeEndpoint();
+            Logger.Log(Level.Info, "Connecting to java bridge on: [{0}]", javaIpEndPoint);
+            _remoteObserver = _remoteManager.GetRemoteObserver(javaIpEndPoint);
+
+            // Negotiate the protocol.
+            Send(0, new BridgeProtocol(100));
+        }
+
+        /// <summary>
+        /// Send a message to the java side of the bridge.
+        /// </summary>
+        /// <param name="identifier">A long value that which is the unique sequence identifier of the message.</param>
+        /// <param name="message">An object reference to a message in the org.apache.reef.bridge.message package.</param>
+        public void Send(long identifier, object message)
+        {
+            Logger.Log(Level.Verbose, "Sending message: {0}", message);
+            _remoteObserver.OnNext(_serializer.Write(message, identifier));
+        }
+
+        /// <summary>
+        /// Retrieves the address of the java bridge.
+        /// </summary>
+        /// <returns>IP address and port of the Java bridge.</returns>
+        private IPEndPoint GetJavaBridgeEndpoint()
+        {
+            using (FileStream stream = File.Open(_fileNames.DriverJavaBridgeEndpointFileName, FileMode.Open))
+            {
+                using (StreamReader reader = new StreamReader(stream))
+                {
+                    string javaBridgeAddress = reader.ReadToEnd();
+                    Logger.Log(Level.Info, "Java bridge address: {0}", javaBridgeAddress);
+
+                    string[] javaAddressStrs = javaBridgeAddress.Split(':');
+                    IPAddress javaBridgeIpAddress = IPAddress.Parse(javaAddressStrs[0]);
+                    int port = int.Parse(javaAddressStrs[1]);
+                    return new IPEndPoint(javaBridgeIpAddress, port);
+                }
+            }
+        }
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj b/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
index dbbe571..20500c3 100644
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
+++ b/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
@@ -53,12 +53,12 @@
     <Compile Include="$(SolutionDir)\SharedAssemblyInfo.cs">
       <Link>Properties\SharedAssemblyInfo.cs</Link>
     </Compile>
+    <Compile Include="LocalObserver.cs" />
     <Compile Include="Message\Acknowledgement.cs" />
-    <Compile Include="Message\BridgeInterop.cs" />
-    <Compile Include="Message\MessageType.cs" />
-    <Compile Include="Message\Protocol.cs" />
+    <Compile Include="Message\BridgeProtocol.cs" />
     <Compile Include="Message\SetupBridge.cs" />
     <Compile Include="Message\SystemOnStart.cs" />
+    <Compile Include="NetworkTransport.cs" />
   </ItemGroup>
   <ItemGroup>
     <None Include="Message\README.md" />
@@ -92,4 +92,4 @@
   <Import Project="$(SolutionDir)\AvroCodeGeneration.Targets" Condition="Exists('$(SolutionDir)\AvroCodeGeneration.Targets')" />
   <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
   <Import Project="$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets" Condition="Exists('$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets')" />
-</Project>
\ No newline at end of file
+</Project>
diff --git a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
index 5cb272e..4c21d1e 100644
--- a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
@@ -50,6 +50,7 @@
         private const string CLR_DRIVER_CONFIGURATION_NAME = "clrdriver.conf";
         private const string CLR_BRIDGE_CONFIGURATION_NAME = "clrBridge.config";
         private const string DRIVER_HTTP_ENDPOINT_FILE_NAME = "DriverHttpEndpoint.txt";
+        private const string DRIVER_JAVA_BRIDGE_ENDPOINT_FILE_NAME = "DriverJavaBridgeEndpoint.txt";
         private const string BRIDGE_EXE_NAME = "Org.Apache.REEF.Bridge.exe";
         private const string BRIDGE_EXE_CONFIG_NAME = "Org.Apache.REEF.Bridge.exe.config";
         private const string SECURITY_TOKEN_IDENTIFIER_FILE = "SecurityTokenId";
@@ -338,12 +339,19 @@
         }
 
         /// <summary>
+        /// Name of the file that contains the driver name server address and port.
         /// </summary>
+        /// <returns>File name that contains the dfs path for the DriverNameServerEndpoint</returns>
+        public string DriverJavaBridgeEndpointFileName
+        {
+            get { return DRIVER_JAVA_BRIDGE_ENDPOINT_FILE_NAME; }
+        }
+
         /// <returns>File name that contains the dfs path for the DriverHttpEndpoint</returns>
         [Unstable("0.13", "Working in progress for what to return after submit")]
-        public string DriverHttpEndpoint 
-        { 
-            get { return DRIVER_HTTP_ENDPOINT_FILE_NAME; } 
+        public string DriverHttpEndpoint
+        {
+            get { return DRIVER_HTTP_ENDPOINT_FILE_NAME; }
         }
 
         private static readonly string GLOBAL_FOLDER_PATH = Path.Combine(REEF_BASE_FOLDER, GLOBAL_FOLDER);
@@ -358,4 +366,4 @@
         private static readonly string EVALUATOR_CONFIGURATION_PATH =
             Path.Combine(LOCAL_FOLDER_PATH, EVALUATOR_CONFIGURATION_NAME);
     }
-}
\ No newline at end of file
+}