[REEF-1813] Add Avro message protocol serializer

   * Added a ProtocolSerializer class to REEF.Wake that automatically
     constructs the searializers and callback mechanism for all
     of the Avro messages defined in a given C# namespace.
   * Added a test case to wake tests that verify the ProtocolSerializer
     properly serializes and deserializes Avro messages going through
     a two RemoteManager connection.
   * Moved header message from bridge clr to wake where serializer lives.

JIRA:
  [REEF-1813](https://issues.apache.org/jira/browse/REEF-1813)

 Pull request:
   This closes #1321
diff --git a/lang/common/wake/avro/Header.avsc b/lang/common/wake/avro/Header.avsc
new file mode 100644
index 0000000..8efd4f5
--- /dev/null
+++ b/lang/common/wake/avro/Header.avsc
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+  /*
+   * Identify the next message in the Java/C# bridge protocol.
+   */
+  {
+    "namespace":"org.apache.reef.wake.avro.message",
+    "type":"record",
+    "name":"Header",
+    "doc":"Identifies the following message in a given protocol.",
+    "fields":[
+      {
+        "name":"sequence",
+        "doc":"Sequence number of message.",
+        "type":"long"
+      },
+      {
+        "name":"className",
+        "doc":"The name of the message class.",
+        "type":"string"
+      }
+    ]
+  }
+]
diff --git a/lang/cs/AvroCodeGeneration.targets b/lang/cs/AvroCodeGeneration.targets
index 48a6695..c6599b2 100644
--- a/lang/cs/AvroCodeGeneration.targets
+++ b/lang/cs/AvroCodeGeneration.targets
@@ -131,9 +131,9 @@
   </UsingTask>
   <Target Name="SetupAvroCodeGen" DependsOnTargets="RestorePackages">
     <Message Text="Copying Avro code generation files to $(AvroBinaryDirectory)..." />
-    <Copy SourceFiles="$(AvroTools)" DestinationFolder="$(AvroBinaryDirectory)" />
-    <Copy SourceFiles="$(AvroLibrary)" DestinationFolder="$(AvroBinaryDirectory)" />
-    <Copy SourceFiles="$(NewtonsoftLibrary)" DestinationFolder="$(AvroBinaryDirectory)" />
+    <Copy SourceFiles="$(AvroTools)" DestinationFolder="$(AvroBinaryDirectory)" SkipUnchangedFiles="true" />
+    <Copy SourceFiles="$(AvroLibrary)" DestinationFolder="$(AvroBinaryDirectory)" SkipUnchangedFiles="true" />
+    <Copy SourceFiles="$(NewtonsoftLibrary)" DestinationFolder="$(AvroBinaryDirectory)" SkipUnchangedFiles="true" />
   </Target>
   <Target Name="CodeGeneration" DependsOnTargets="SetupAvroCodeGen" BeforeTargets="CoreCompile">
     <Message Text="Generating C# classes from Avro avsc files @(Compile)..." />
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj b/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
index cc50a6f..753b088 100644
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
+++ b/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
@@ -54,7 +54,6 @@
       <Link>Properties\SharedAssemblyInfo.cs</Link>
     </Compile>
     <Compile Include="Message\Acknowledgement.cs" />
-    <Compile Include="Message\Header.cs" />
     <Compile Include="Message\Protocol.cs" />
     <Compile Include="Message\SystemOnStart.cs" />
   </ItemGroup>
@@ -89,4 +88,4 @@
   <Import Project="$(SolutionDir)\AvroCodeGeneration.Targets" Condition="Exists('$(SolutionDir)\AvroCodeGeneration.Targets')" />
   <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
   <Import Project="$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets" Condition="Exists('$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets')" />
-</Project>
+</Project>
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config b/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config
index a0941f8..9f55ae4 100644
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config
+++ b/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config
@@ -18,8 +18,6 @@
 under the License.
 -->
 <packages>
-  <package id="Microsoft.Avro.Core" version="0.1.0" targetFramework="net451" developmentDependency="true" />
-  <package id="Microsoft.Avro.Tools" version="0.1.0" targetFramework="net451" developmentDependency="true" />
   <package id="Microsoft.Hadoop.Avro" version="1.5.6" targetFramework="net45" />
   <package id="Newtonsoft.Json" version="8.0.3" targetFramework="net45" />
   <package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45" developmentDependency="true" />
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Message/AvroTestMessage.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/Message/AvroTestMessage.cs
new file mode 100644
index 0000000..6da55bd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Message/AvroTestMessage.cs
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Runtime.Serialization;
+
+namespace org.apache.reef.wake.tests.message
+{
+    /// <summary>
+    /// Internal message header structure used to identify
+    /// the following message in the deserialization stream.
+    /// </summary>
+    [DataContract(Namespace = "org.apache.reef.wake.tests.message")]
+    public class AvroTestMessage
+    {
+        [DataMember]
+        public int number { get; set; }
+
+        [DataMember]
+        public string data { get; set; }
+
+        public AvroTestMessage()
+        {
+        }
+
+        public AvroTestMessage(int number, string data)
+        {
+            this.number = number;
+            this.data = data;
+        }
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index e9cb4c6..daf065d 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -36,17 +36,24 @@
   <ItemGroup>
     <Reference Include="System" />
     <Reference Include="System.Core" />
+    <Reference Include="System.Data" />
     <Reference Include="System.Reactive.Core">
       <HintPath>$(PackagesDir)\System.Reactive.Core.$(SystemReactiveVersion)\lib\net45\System.Reactive.Core.dll</HintPath>
     </Reference>
     <Reference Include="System.Reactive.Interfaces">
       <HintPath>$(PackagesDir)\System.Reactive.Interfaces.$(SystemReactiveVersion)\lib\net45\System.Reactive.Interfaces.dll</HintPath>
     </Reference>
+    <Reference Include="System.Runtime.Serialization" />
+    <Reference Include="Microsoft.Hadoop.Avro">
+      <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net45\Microsoft.Hadoop.Avro.dll</HintPath>
+    </Reference>
   </ItemGroup>
   <ItemGroup>
     <Compile Include="$(SolutionDir)\SharedAssemblyInfo.cs">
       <Link>Properties\SharedAssemblyInfo.cs</Link>
     </Compile>
+    <Compile Include="Message\AvroTestMessage.cs" />
+    <Compile Include="ProtocolSerializerTest.cs" />
     <Compile Include="ClockTest.cs" />
     <Compile Include="MultiCodecTest.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
new file mode 100644
index 0000000..8f4a1ca
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Concurrent;
+using System.Net;
+using System.Reactive;
+using Org.Apache.REEF.Wake.Avro;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using org.apache.reef.wake.tests.message;
+using Xunit;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+    /// <summary>
+    /// Observer to receive and verify test message contents.
+    /// </summary>
+    internal sealed class TestMessageObserver : IObserver<MessageInstance<AvroTestMessage>>
+    {
+        int number;
+        string data;
+
+        public TestMessageObserver(int number, string data)
+        {
+            this.number = number;
+            this.data = data;
+        }
+
+        public void OnNext(MessageInstance<AvroTestMessage> instance)
+        {
+            Assert.Equal(instance.message.number, this.number);
+            Assert.Equal(instance.message.data, this.data);
+        }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+    }
+
+    [Collection("FunctionalTests")]
+    public class TestProtocolSerializer
+    {
+        /// <summary>
+        /// Setup two way communication between two remote managers through the loopback
+        /// network and verify that Avro messages are properly serialized and deserialzied
+        /// by the ProtocolSerializer class.
+        /// </summary>
+        [Fact]
+        [Trait("Priority", "1")]
+        public void TestTwoWayCommunication()
+        {
+            // Test data.
+            int[] numbers = { 12, 25 };
+            string[] strings = { "The first string", "The second string" };
+
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+            BlockingCollection<byte[]> queue1 = new BlockingCollection<byte[]>();
+            BlockingCollection<byte[]> queue2 = new BlockingCollection<byte[]>();
+
+            ProtocolSerializer serializer = new ProtocolSerializer(this.GetType().Assembly, "org.apache.reef.wake.tests.message");
+            IRemoteManagerFactory _remoteManagerFactory = TangFactory.GetTang().NewInjector().GetInstance<IRemoteManagerFactory>();
+
+            using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, new ByteCodec()))
+            using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, new ByteCodec()))
+            {
+                // Register observers for remote manager 1 and remote manager 2
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var observer1 = Observer.Create<byte[]>(queue1.Add);
+                var observer2 = Observer.Create<byte[]>(queue2.Add);
+                remoteManager1.RegisterObserver(remoteEndpoint, observer1);
+                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+
+                // Remote manager 1 sends avro message to remote manager 2
+                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver1.OnNext(serializer.Write(new AvroTestMessage(numbers[0], strings[0]), 1));
+
+                // Remote manager 2 sends avro message to remote manager 1
+                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+                remoteObserver2.OnNext(serializer.Write(new AvroTestMessage(numbers[1], strings[1]), 2));
+
+                // Verify the messages are properly received.
+                serializer.Read(queue1.Take(), new TestMessageObserver(numbers[1], strings[1]));
+                serializer.Read(queue2.Take(), new TestMessageObserver(numbers[0], strings[0]));
+            }
+        }
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config b/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
index 581efe6..e0d4dbf 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
@@ -18,6 +18,7 @@
 under the License.
 -->
 <packages>
+  <package id="Microsoft.Hadoop.Avro" version="1.5.6" targetFramework="net45" />
   <package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45" developmentDependency="true" />
   <package id="System.Reactive.Core" version="3.1.1" targetFramework="net451" />
   <package id="System.Reactive.Interfaces" version="3.1.1" targetFramework="net451" />
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Header.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/Message/Header.cs
similarity index 73%
rename from lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Header.cs
rename to lang/cs/Org.Apache.REEF.Wake/Avro/Message/Header.cs
index cf0b939..318515b 100644
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Header.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/Message/Header.cs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 //<auto-generated />
-namespace org.apache.reef.bridge.message
+namespace org.apache.reef.wake.avro.message
 {
     using System;
     using System.Collections.Generic;
@@ -23,12 +23,12 @@
     using Microsoft.Hadoop.Avro;
 
     /// <summary>
-    /// Used to serialize and deserialize Avro record org.apache.reef.bridge.message.Header.
+    /// Used to serialize and deserialize Avro record org.apache.reef.wake.avro.message.Header.
     /// </summary>
-    [DataContract(Namespace = "org.apache.reef.bridge.message")]
+    [DataContract(Namespace = "org.apache.reef.wake.avro.message")]
     public partial class Header
     {
-        private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.bridge.message.Header"",""doc"":""Identifies the following message in the Java/C# bridge protocol."",""fields"":[{""name"":""identifier"",""doc"":""Identifier of the next message to be read."",""type"":""long""},{""name"":""className"",""doc"":""The fully qualified name of the message class."",""type"":""string""}]}";
+        private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.wake.avro.message.Header"",""doc"":""Identifies the following message in a given protocol."",""fields"":[{""name"":""sequence"",""doc"":""Sequence number of message."",""type"":""long""},{""name"":""className"",""doc"":""The name of the message class."",""type"":""string""}]}";
 
         /// <summary>
         /// Gets the schema.
@@ -42,10 +42,10 @@
         }
       
         /// <summary>
-        /// Gets or sets the identifier field.
+        /// Gets or sets the sequence field.
         /// </summary>
         [DataMember]
-        public long identifier { get; set; }
+        public long sequence { get; set; }
               
         /// <summary>
         /// Gets or sets the className field.
@@ -63,11 +63,11 @@
         /// <summary>
         /// Initializes a new instance of the <see cref="Header"/> class.
         /// </summary>
-        /// <param name="identifier">The identifier.</param>
+        /// <param name="sequence">The sequence.</param>
         /// <param name="className">The className.</param>
-        public Header(long identifier, string className)
+        public Header(long sequence, string className)
         {
-            this.identifier = identifier;
+            this.sequence = sequence;
             this.className = className;
         }
     }
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
new file mode 100644
index 0000000..96ac8a4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+namespace Org.Apache.REEF.Wake.Avro
+{
+    /// <summary>
+    /// Wrapper class to bind a specific instance of an Avro messagage
+    /// with it associated sequence number.
+    /// </summary>
+    /// <typeparam name="T"></typeparam>
+    public struct MessageInstance<T>
+    {
+        public long sequence;
+        public T message;
+
+        public MessageInstance(long sequence, T message)
+        {
+            this.sequence = sequence;
+            this.message = message;
+        }
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
new file mode 100644
index 0000000..dff30b1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Reflection;
+using Microsoft.Hadoop.Avro;
+using Org.Apache.REEF.Utilities.Logging;
+using org.apache.reef.wake.avro.message;
+
+namespace Org.Apache.REEF.Wake.Avro
+{
+    /// <summary>
+    /// Given a namespace of Avro messages which represent a protocol, the ProtocolSerailizer
+    /// class reflects all of the message classes and builds Avro seriailziers and deserializers.
+    /// Avro messages are then serialized/deserilaized to and from byte[] arrays using the
+    /// Read/Write methods. A transport such as a RemoteObserver using a ByteCodec can then
+    /// be used to send and receive the serialized messages.
+    /// </summary>
+    public sealed class ProtocolSerializer
+    {
+        private static readonly Logger Logr = Logger.GetLogger(typeof(ProtocolSerializer));
+
+        // Delagates for message serializers and deserializers.
+        private delegate void Serialize(MemoryStream stream, object message);
+        private delegate void Deserialize(MemoryStream stream, object observer, long sequence);
+
+        // Message type to serialize/derserialize delagate.
+        private readonly SortedDictionary<string, Serialize> serializeMap = new SortedDictionary<string, Serialize>();
+        private readonly SortedDictionary<string, Deserialize> deserializeMap = new SortedDictionary<string, Deserialize>();
+
+        private readonly IAvroSerializer<Header> headerSerializer = AvroSerializer.Create<Header>();
+
+        /// <summary>
+        /// Register all of the protocol messages using reflection.
+        /// </summary>
+        /// <param name="assembly">The Assembley object which contains the namespace of the message classes.</param>
+        /// <param name="messageNamespace">A string which contains the namespace the protocol messages.</param>
+        public ProtocolSerializer(Assembly assembly, string messageNamespace)
+        {
+            MethodInfo registerInfo = typeof(ProtocolSerializer).GetMethod("Register", BindingFlags.Instance | BindingFlags.NonPublic);
+            MethodInfo genericInfo;
+
+            Logr.Log(Level.Info, "Retrieving types for assembly: {0}", assembly.FullName);
+            List<Type> types = new List<Type>(assembly.GetTypes());
+            types.Add(typeof(Header));
+
+            foreach (Type type in types)
+            {
+                string name = type.FullName;
+                if (name.StartsWith(messageNamespace))
+                {
+                    genericInfo = registerInfo.MakeGenericMethod(new[] { type });
+                    genericInfo.Invoke(this, null);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Generate and store the metadata necessary to serialze and deserialize a specific message type.
+        /// </summary>
+        /// <typeparam name="TMessage">The class type of the message being registered.</typeparam>
+        internal void Register<TMessage>()
+        {
+            Logr.Log(Level.Info, "Registering message type: {0} {1}", typeof(TMessage).FullName, typeof(TMessage).Name);
+
+            IAvroSerializer<TMessage> messageSerializer = AvroSerializer.Create<TMessage>();
+            Serialize serialize = (MemoryStream stream, object message) =>
+            {
+                messageSerializer.Serialize(stream, (TMessage)message);
+            };
+            serializeMap.Add(typeof(TMessage).Name, serialize);
+
+            Deserialize deserialize = (MemoryStream stream, object observer, long sequence) =>
+            {
+                TMessage message = messageSerializer.Deserialize(stream);
+                IObserver<MessageInstance<TMessage>> msgObserver = observer as IObserver<MessageInstance<TMessage>>;
+                if (msgObserver != null)
+                {
+                    msgObserver.OnNext(new MessageInstance<TMessage>(sequence, message));
+                }
+                else
+                {
+                    Logr.Log(Level.Warning, "Unhandled message received: {0}", message);
+                }
+            };
+            deserializeMap.Add(typeof(TMessage).Name, deserialize);
+        }
+
+        /// <summary>
+        /// Serialize the input message and return a byte array.
+        /// </summary>
+        /// <param name="message">An object reference to the messeage to be serialized</param>
+        /// <param name="sequence">A long which cotains the higher level protocols sequence number for the message.</param>
+        /// <returns>A byte array containing the serialized header and message.</returns>
+        public byte[] Write(object message, long sequence) 
+        {
+            string name = message.GetType().Name;
+            Logr.Log(Level.Info, "Serializing message: {0}", name);
+            try
+            { 
+                using (MemoryStream stream = new MemoryStream())
+                {
+                    Header header = new Header(sequence, name);
+                    headerSerializer.Serialize(stream, header);
+
+                    Serialize serialize;
+                    if (serializeMap.TryGetValue(name, out serialize))
+                    {
+                        serialize(stream, message);
+                    }
+                    else
+                    {
+                        throw new SeializationException("Request to serialize unknown message type: " + name);
+                    }
+                    return stream.GetBuffer();
+                }
+            }
+            catch (Exception e)
+            {
+                Logr.Log(Level.Error, "Failure writing message.", e);
+                throw e;
+            }
+        }
+
+        /// <summary>
+        /// Read a message from the input byte array.
+        /// </summary>
+        /// <param name="data">The byte array containing a header message and message to be deserialized.</param>
+        /// <param name="observer">An object which implements the IObserver<> interface for the message being deserialized.</param>
+        public void Read(byte[] data, object observer)
+        {
+            try
+            {
+                using (MemoryStream stream = new MemoryStream(data))
+                {
+                    Header head = headerSerializer.Deserialize(stream);
+                    Deserialize deserialize;
+                    if (deserializeMap.TryGetValue(head.className, out deserialize))
+                    {
+                        deserialize(stream, observer, head.sequence);
+                    }
+                    else
+                    {
+                        throw new SeializationException("Request to deserialize unknown message type: " + head.className);
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                Logr.Log(Level.Error, "Failure reading message.", e);
+                throw e;
+            }
+        }
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/SeializationException.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/SeializationException.cs
new file mode 100644
index 0000000..4ecc4d9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/SeializationException.cs
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+using System;
+
+namespace Org.Apache.REEF.Wake.Avro
+{
+    /// <summary>
+    /// The SeializationException is generated when an Avro serializer is
+    /// requested to send or receive a message that does not exist is in
+    /// message namespace it was given at construction time.
+    /// </summary>
+    class SeializationException : Exception
+    {
+        public SeializationException(string message) : base(message)
+        {
+        }
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index a380400..9dcc039 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -27,9 +27,17 @@
     <FileAlignment>512</FileAlignment>
     <RestorePackages>true</RestorePackages>
     <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..</SolutionDir>
+    <AvroBinaryDirectory>..\packages\AvroBin</AvroBinaryDirectory>
+    <AvroSchemaDirectory>..\..\common\wake\avro</AvroSchemaDirectory>
+    <AvroTools>..\packages\Microsoft.Avro.Tools.0.1.0\lib\net451\Microsoft.Avro.Tools.exe</AvroTools>
+    <AvroLibrary>..\packages\Microsoft.Avro.Core.0.1.0\lib\net451\Microsoft.Avro.Core.dll</AvroLibrary>
+    <NewtonsoftLibrary>..\packages\Newtonsoft.Json.10.0.3\lib\net45\Newtonsoft.Json.dll</NewtonsoftLibrary>
   </PropertyGroup>
   <Import Project="$(SolutionDir)\build.props" />
   <ItemGroup>
+    <Reference Include="Microsoft.Hadoop.Avro">
+      <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net45\Microsoft.Hadoop.Avro.dll</HintPath>
+    </Reference>
     <Reference Include="Microsoft.Practices.TransientFaultHandling.Core, Version=5.1.1209.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
       <HintPath>$(PackagesDir)\TransientFaultHandling.Core.5.1.1209.1\lib\NET4\Microsoft.Practices.TransientFaultHandling.Core.dll</HintPath>
     </Reference>
@@ -38,6 +46,7 @@
     </Reference>
     <Reference Include="System" />
     <Reference Include="System.Core" />
+    <Reference Include="System.Data" />
     <Reference Include="System.Reactive.Core">
       <HintPath>$(PackagesDir)\System.Reactive.Core.$(SystemReactiveVersion)\lib\net45\System.Reactive.Core.dll</HintPath>
       <Private>True</Private>
@@ -45,6 +54,7 @@
     <Reference Include="System.Reactive.Interfaces">
       <HintPath>$(PackagesDir)\System.Reactive.Interfaces.$(SystemReactiveVersion)\lib\net45\System.Reactive.Interfaces.dll</HintPath>
     </Reference>
+    <Reference Include="System.Runtime.Serialization" />
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
@@ -52,6 +62,10 @@
       <Link>Properties\SharedAssemblyInfo.cs</Link>
     </Compile>
     <Compile Include="AbstractEStage.cs" />
+    <Compile Include="Avro\MessageInstance.cs" />
+    <Compile Include="Avro\Message\Header.cs" />
+    <Compile Include="Avro\ProtocolSerializer.cs" />
+    <Compile Include="Avro\SeializationException.cs" />
     <Compile Include="Examples\P2p\IEventSource.cs" />
     <Compile Include="Examples\P2p\Pull2Push.cs" />
     <Compile Include="IEStage.cs" />
@@ -205,5 +219,6 @@
   </ItemGroup>
   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
   <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+  <Import Project="$(SolutionDir)\AvroCodeGeneration.Targets" Condition="Exists('$(SolutionDir)\AvroCodeGeneration.Targets')" />
   <Import Project="$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets" Condition="Exists('$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets')" />
 </Project>
diff --git a/lang/cs/Org.Apache.REEF.Wake/packages.config b/lang/cs/Org.Apache.REEF.Wake/packages.config
index e176d57..beb1da1 100644
--- a/lang/cs/Org.Apache.REEF.Wake/packages.config
+++ b/lang/cs/Org.Apache.REEF.Wake/packages.config
@@ -18,6 +18,10 @@
 under the License.
 -->
 <packages>
+  <package id="Microsoft.Avro.Core" version="0.1.0" targetFramework="net451" developmentDependency="true" />
+  <package id="Microsoft.Avro.Tools" version="0.1.0" targetFramework="net451" developmentDependency="true" />
+  <package id="Newtonsoft.Json" version="10.0.3" targetFramework="net451" developmentDependency="true" />
+  <package id="Microsoft.Hadoop.Avro" version="1.5.6" targetFramework="net45" />
   <package id="protobuf-net" version="2.0.0.668" targetFramework="net45" />
   <package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45" developmentDependency="true" />
   <package id="System.Reactive.Core" version="3.1.1" targetFramework="net451" />
diff --git a/lang/cs/nuget.config b/lang/cs/nuget.config
index ddd79f7..79317f3 100644
--- a/lang/cs/nuget.config
+++ b/lang/cs/nuget.config
@@ -19,6 +19,7 @@
 -->
 <configuration>
     <packageSources>
+        <add key="nuget.org" value="https://www.nuget.org/api/V2" />
         <add key="dotnet" value="https://dotnet.myget.org/F/dotnet-core/api/v3/index.json" />
     </packageSources>
 </configuration>