[REEF-1813] Add Avro message protocol serializer
* Added a ProtocolSerializer class to REEF.Wake that automatically
constructs the searializers and callback mechanism for all
of the Avro messages defined in a given C# namespace.
* Added a test case to wake tests that verify the ProtocolSerializer
properly serializes and deserializes Avro messages going through
a two RemoteManager connection.
* Moved header message from bridge clr to wake where serializer lives.
JIRA:
[REEF-1813](https://issues.apache.org/jira/browse/REEF-1813)
Pull request:
This closes #1321
diff --git a/lang/common/wake/avro/Header.avsc b/lang/common/wake/avro/Header.avsc
new file mode 100644
index 0000000..8efd4f5
--- /dev/null
+++ b/lang/common/wake/avro/Header.avsc
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+ /*
+ * Identify the next message in the Java/C# bridge protocol.
+ */
+ {
+ "namespace":"org.apache.reef.wake.avro.message",
+ "type":"record",
+ "name":"Header",
+ "doc":"Identifies the following message in a given protocol.",
+ "fields":[
+ {
+ "name":"sequence",
+ "doc":"Sequence number of message.",
+ "type":"long"
+ },
+ {
+ "name":"className",
+ "doc":"The name of the message class.",
+ "type":"string"
+ }
+ ]
+ }
+]
diff --git a/lang/cs/AvroCodeGeneration.targets b/lang/cs/AvroCodeGeneration.targets
index 48a6695..c6599b2 100644
--- a/lang/cs/AvroCodeGeneration.targets
+++ b/lang/cs/AvroCodeGeneration.targets
@@ -131,9 +131,9 @@
</UsingTask>
<Target Name="SetupAvroCodeGen" DependsOnTargets="RestorePackages">
<Message Text="Copying Avro code generation files to $(AvroBinaryDirectory)..." />
- <Copy SourceFiles="$(AvroTools)" DestinationFolder="$(AvroBinaryDirectory)" />
- <Copy SourceFiles="$(AvroLibrary)" DestinationFolder="$(AvroBinaryDirectory)" />
- <Copy SourceFiles="$(NewtonsoftLibrary)" DestinationFolder="$(AvroBinaryDirectory)" />
+ <Copy SourceFiles="$(AvroTools)" DestinationFolder="$(AvroBinaryDirectory)" SkipUnchangedFiles="true" />
+ <Copy SourceFiles="$(AvroLibrary)" DestinationFolder="$(AvroBinaryDirectory)" SkipUnchangedFiles="true" />
+ <Copy SourceFiles="$(NewtonsoftLibrary)" DestinationFolder="$(AvroBinaryDirectory)" SkipUnchangedFiles="true" />
</Target>
<Target Name="CodeGeneration" DependsOnTargets="SetupAvroCodeGen" BeforeTargets="CoreCompile">
<Message Text="Generating C# classes from Avro avsc files @(Compile)..." />
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj b/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
index cc50a6f..753b088 100644
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
+++ b/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
@@ -54,7 +54,6 @@
<Link>Properties\SharedAssemblyInfo.cs</Link>
</Compile>
<Compile Include="Message\Acknowledgement.cs" />
- <Compile Include="Message\Header.cs" />
<Compile Include="Message\Protocol.cs" />
<Compile Include="Message\SystemOnStart.cs" />
</ItemGroup>
@@ -89,4 +88,4 @@
<Import Project="$(SolutionDir)\AvroCodeGeneration.Targets" Condition="Exists('$(SolutionDir)\AvroCodeGeneration.Targets')" />
<Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
<Import Project="$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets" Condition="Exists('$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets')" />
-</Project>
+</Project>
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config b/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config
index a0941f8..9f55ae4 100644
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config
+++ b/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config
@@ -18,8 +18,6 @@
under the License.
-->
<packages>
- <package id="Microsoft.Avro.Core" version="0.1.0" targetFramework="net451" developmentDependency="true" />
- <package id="Microsoft.Avro.Tools" version="0.1.0" targetFramework="net451" developmentDependency="true" />
<package id="Microsoft.Hadoop.Avro" version="1.5.6" targetFramework="net45" />
<package id="Newtonsoft.Json" version="8.0.3" targetFramework="net45" />
<package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45" developmentDependency="true" />
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Message/AvroTestMessage.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/Message/AvroTestMessage.cs
new file mode 100644
index 0000000..6da55bd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Message/AvroTestMessage.cs
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Runtime.Serialization;
+
+namespace org.apache.reef.wake.tests.message
+{
+ /// <summary>
+ /// Internal message header structure used to identify
+ /// the following message in the deserialization stream.
+ /// </summary>
+ [DataContract(Namespace = "org.apache.reef.wake.tests.message")]
+ public class AvroTestMessage
+ {
+ [DataMember]
+ public int number { get; set; }
+
+ [DataMember]
+ public string data { get; set; }
+
+ public AvroTestMessage()
+ {
+ }
+
+ public AvroTestMessage(int number, string data)
+ {
+ this.number = number;
+ this.data = data;
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index e9cb4c6..daf065d 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -36,17 +36,24 @@
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Core" />
+ <Reference Include="System.Data" />
<Reference Include="System.Reactive.Core">
<HintPath>$(PackagesDir)\System.Reactive.Core.$(SystemReactiveVersion)\lib\net45\System.Reactive.Core.dll</HintPath>
</Reference>
<Reference Include="System.Reactive.Interfaces">
<HintPath>$(PackagesDir)\System.Reactive.Interfaces.$(SystemReactiveVersion)\lib\net45\System.Reactive.Interfaces.dll</HintPath>
</Reference>
+ <Reference Include="System.Runtime.Serialization" />
+ <Reference Include="Microsoft.Hadoop.Avro">
+ <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net45\Microsoft.Hadoop.Avro.dll</HintPath>
+ </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="$(SolutionDir)\SharedAssemblyInfo.cs">
<Link>Properties\SharedAssemblyInfo.cs</Link>
</Compile>
+ <Compile Include="Message\AvroTestMessage.cs" />
+ <Compile Include="ProtocolSerializerTest.cs" />
<Compile Include="ClockTest.cs" />
<Compile Include="MultiCodecTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
new file mode 100644
index 0000000..8f4a1ca
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Concurrent;
+using System.Net;
+using System.Reactive;
+using Org.Apache.REEF.Wake.Avro;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using org.apache.reef.wake.tests.message;
+using Xunit;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+ /// <summary>
+ /// Observer to receive and verify test message contents.
+ /// </summary>
+ internal sealed class TestMessageObserver : IObserver<MessageInstance<AvroTestMessage>>
+ {
+ int number;
+ string data;
+
+ public TestMessageObserver(int number, string data)
+ {
+ this.number = number;
+ this.data = data;
+ }
+
+ public void OnNext(MessageInstance<AvroTestMessage> instance)
+ {
+ Assert.Equal(instance.message.number, this.number);
+ Assert.Equal(instance.message.data, this.data);
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ [Collection("FunctionalTests")]
+ public class TestProtocolSerializer
+ {
+ /// <summary>
+ /// Setup two way communication between two remote managers through the loopback
+ /// network and verify that Avro messages are properly serialized and deserialzied
+ /// by the ProtocolSerializer class.
+ /// </summary>
+ [Fact]
+ [Trait("Priority", "1")]
+ public void TestTwoWayCommunication()
+ {
+ // Test data.
+ int[] numbers = { 12, 25 };
+ string[] strings = { "The first string", "The second string" };
+
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+ BlockingCollection<byte[]> queue1 = new BlockingCollection<byte[]>();
+ BlockingCollection<byte[]> queue2 = new BlockingCollection<byte[]>();
+
+ ProtocolSerializer serializer = new ProtocolSerializer(this.GetType().Assembly, "org.apache.reef.wake.tests.message");
+ IRemoteManagerFactory _remoteManagerFactory = TangFactory.GetTang().NewInjector().GetInstance<IRemoteManagerFactory>();
+
+ using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, new ByteCodec()))
+ using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, new ByteCodec()))
+ {
+ // Register observers for remote manager 1 and remote manager 2
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var observer1 = Observer.Create<byte[]>(queue1.Add);
+ var observer2 = Observer.Create<byte[]>(queue2.Add);
+ remoteManager1.RegisterObserver(remoteEndpoint, observer1);
+ remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+
+ // Remote manager 1 sends avro message to remote manager 2
+ var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver1.OnNext(serializer.Write(new AvroTestMessage(numbers[0], strings[0]), 1));
+
+ // Remote manager 2 sends avro message to remote manager 1
+ var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+ remoteObserver2.OnNext(serializer.Write(new AvroTestMessage(numbers[1], strings[1]), 2));
+
+ // Verify the messages are properly received.
+ serializer.Read(queue1.Take(), new TestMessageObserver(numbers[1], strings[1]));
+ serializer.Read(queue2.Take(), new TestMessageObserver(numbers[0], strings[0]));
+ }
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config b/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
index 581efe6..e0d4dbf 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
@@ -18,6 +18,7 @@
under the License.
-->
<packages>
+ <package id="Microsoft.Hadoop.Avro" version="1.5.6" targetFramework="net45" />
<package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45" developmentDependency="true" />
<package id="System.Reactive.Core" version="3.1.1" targetFramework="net451" />
<package id="System.Reactive.Interfaces" version="3.1.1" targetFramework="net451" />
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Header.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/Message/Header.cs
similarity index 73%
rename from lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Header.cs
rename to lang/cs/Org.Apache.REEF.Wake/Avro/Message/Header.cs
index cf0b939..318515b 100644
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Header.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/Message/Header.cs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
//<auto-generated />
-namespace org.apache.reef.bridge.message
+namespace org.apache.reef.wake.avro.message
{
using System;
using System.Collections.Generic;
@@ -23,12 +23,12 @@
using Microsoft.Hadoop.Avro;
/// <summary>
- /// Used to serialize and deserialize Avro record org.apache.reef.bridge.message.Header.
+ /// Used to serialize and deserialize Avro record org.apache.reef.wake.avro.message.Header.
/// </summary>
- [DataContract(Namespace = "org.apache.reef.bridge.message")]
+ [DataContract(Namespace = "org.apache.reef.wake.avro.message")]
public partial class Header
{
- private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.bridge.message.Header"",""doc"":""Identifies the following message in the Java/C# bridge protocol."",""fields"":[{""name"":""identifier"",""doc"":""Identifier of the next message to be read."",""type"":""long""},{""name"":""className"",""doc"":""The fully qualified name of the message class."",""type"":""string""}]}";
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.wake.avro.message.Header"",""doc"":""Identifies the following message in a given protocol."",""fields"":[{""name"":""sequence"",""doc"":""Sequence number of message."",""type"":""long""},{""name"":""className"",""doc"":""The name of the message class."",""type"":""string""}]}";
/// <summary>
/// Gets the schema.
@@ -42,10 +42,10 @@
}
/// <summary>
- /// Gets or sets the identifier field.
+ /// Gets or sets the sequence field.
/// </summary>
[DataMember]
- public long identifier { get; set; }
+ public long sequence { get; set; }
/// <summary>
/// Gets or sets the className field.
@@ -63,11 +63,11 @@
/// <summary>
/// Initializes a new instance of the <see cref="Header"/> class.
/// </summary>
- /// <param name="identifier">The identifier.</param>
+ /// <param name="sequence">The sequence.</param>
/// <param name="className">The className.</param>
- public Header(long identifier, string className)
+ public Header(long sequence, string className)
{
- this.identifier = identifier;
+ this.sequence = sequence;
this.className = className;
}
}
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
new file mode 100644
index 0000000..96ac8a4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+namespace Org.Apache.REEF.Wake.Avro
+{
+ /// <summary>
+ /// Wrapper class to bind a specific instance of an Avro messagage
+ /// with it associated sequence number.
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ public struct MessageInstance<T>
+ {
+ public long sequence;
+ public T message;
+
+ public MessageInstance(long sequence, T message)
+ {
+ this.sequence = sequence;
+ this.message = message;
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
new file mode 100644
index 0000000..dff30b1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Reflection;
+using Microsoft.Hadoop.Avro;
+using Org.Apache.REEF.Utilities.Logging;
+using org.apache.reef.wake.avro.message;
+
+namespace Org.Apache.REEF.Wake.Avro
+{
+ /// <summary>
+ /// Given a namespace of Avro messages which represent a protocol, the ProtocolSerailizer
+ /// class reflects all of the message classes and builds Avro seriailziers and deserializers.
+ /// Avro messages are then serialized/deserilaized to and from byte[] arrays using the
+ /// Read/Write methods. A transport such as a RemoteObserver using a ByteCodec can then
+ /// be used to send and receive the serialized messages.
+ /// </summary>
+ public sealed class ProtocolSerializer
+ {
+ private static readonly Logger Logr = Logger.GetLogger(typeof(ProtocolSerializer));
+
+ // Delagates for message serializers and deserializers.
+ private delegate void Serialize(MemoryStream stream, object message);
+ private delegate void Deserialize(MemoryStream stream, object observer, long sequence);
+
+ // Message type to serialize/derserialize delagate.
+ private readonly SortedDictionary<string, Serialize> serializeMap = new SortedDictionary<string, Serialize>();
+ private readonly SortedDictionary<string, Deserialize> deserializeMap = new SortedDictionary<string, Deserialize>();
+
+ private readonly IAvroSerializer<Header> headerSerializer = AvroSerializer.Create<Header>();
+
+ /// <summary>
+ /// Register all of the protocol messages using reflection.
+ /// </summary>
+ /// <param name="assembly">The Assembley object which contains the namespace of the message classes.</param>
+ /// <param name="messageNamespace">A string which contains the namespace the protocol messages.</param>
+ public ProtocolSerializer(Assembly assembly, string messageNamespace)
+ {
+ MethodInfo registerInfo = typeof(ProtocolSerializer).GetMethod("Register", BindingFlags.Instance | BindingFlags.NonPublic);
+ MethodInfo genericInfo;
+
+ Logr.Log(Level.Info, "Retrieving types for assembly: {0}", assembly.FullName);
+ List<Type> types = new List<Type>(assembly.GetTypes());
+ types.Add(typeof(Header));
+
+ foreach (Type type in types)
+ {
+ string name = type.FullName;
+ if (name.StartsWith(messageNamespace))
+ {
+ genericInfo = registerInfo.MakeGenericMethod(new[] { type });
+ genericInfo.Invoke(this, null);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Generate and store the metadata necessary to serialze and deserialize a specific message type.
+ /// </summary>
+ /// <typeparam name="TMessage">The class type of the message being registered.</typeparam>
+ internal void Register<TMessage>()
+ {
+ Logr.Log(Level.Info, "Registering message type: {0} {1}", typeof(TMessage).FullName, typeof(TMessage).Name);
+
+ IAvroSerializer<TMessage> messageSerializer = AvroSerializer.Create<TMessage>();
+ Serialize serialize = (MemoryStream stream, object message) =>
+ {
+ messageSerializer.Serialize(stream, (TMessage)message);
+ };
+ serializeMap.Add(typeof(TMessage).Name, serialize);
+
+ Deserialize deserialize = (MemoryStream stream, object observer, long sequence) =>
+ {
+ TMessage message = messageSerializer.Deserialize(stream);
+ IObserver<MessageInstance<TMessage>> msgObserver = observer as IObserver<MessageInstance<TMessage>>;
+ if (msgObserver != null)
+ {
+ msgObserver.OnNext(new MessageInstance<TMessage>(sequence, message));
+ }
+ else
+ {
+ Logr.Log(Level.Warning, "Unhandled message received: {0}", message);
+ }
+ };
+ deserializeMap.Add(typeof(TMessage).Name, deserialize);
+ }
+
+ /// <summary>
+ /// Serialize the input message and return a byte array.
+ /// </summary>
+ /// <param name="message">An object reference to the messeage to be serialized</param>
+ /// <param name="sequence">A long which cotains the higher level protocols sequence number for the message.</param>
+ /// <returns>A byte array containing the serialized header and message.</returns>
+ public byte[] Write(object message, long sequence)
+ {
+ string name = message.GetType().Name;
+ Logr.Log(Level.Info, "Serializing message: {0}", name);
+ try
+ {
+ using (MemoryStream stream = new MemoryStream())
+ {
+ Header header = new Header(sequence, name);
+ headerSerializer.Serialize(stream, header);
+
+ Serialize serialize;
+ if (serializeMap.TryGetValue(name, out serialize))
+ {
+ serialize(stream, message);
+ }
+ else
+ {
+ throw new SeializationException("Request to serialize unknown message type: " + name);
+ }
+ return stream.GetBuffer();
+ }
+ }
+ catch (Exception e)
+ {
+ Logr.Log(Level.Error, "Failure writing message.", e);
+ throw e;
+ }
+ }
+
+ /// <summary>
+ /// Read a message from the input byte array.
+ /// </summary>
+ /// <param name="data">The byte array containing a header message and message to be deserialized.</param>
+ /// <param name="observer">An object which implements the IObserver<> interface for the message being deserialized.</param>
+ public void Read(byte[] data, object observer)
+ {
+ try
+ {
+ using (MemoryStream stream = new MemoryStream(data))
+ {
+ Header head = headerSerializer.Deserialize(stream);
+ Deserialize deserialize;
+ if (deserializeMap.TryGetValue(head.className, out deserialize))
+ {
+ deserialize(stream, observer, head.sequence);
+ }
+ else
+ {
+ throw new SeializationException("Request to deserialize unknown message type: " + head.className);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ Logr.Log(Level.Error, "Failure reading message.", e);
+ throw e;
+ }
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/SeializationException.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/SeializationException.cs
new file mode 100644
index 0000000..4ecc4d9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/SeializationException.cs
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+using System;
+
+namespace Org.Apache.REEF.Wake.Avro
+{
+ /// <summary>
+ /// The SeializationException is generated when an Avro serializer is
+ /// requested to send or receive a message that does not exist is in
+ /// message namespace it was given at construction time.
+ /// </summary>
+ class SeializationException : Exception
+ {
+ public SeializationException(string message) : base(message)
+ {
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index a380400..9dcc039 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -27,9 +27,17 @@
<FileAlignment>512</FileAlignment>
<RestorePackages>true</RestorePackages>
<SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..</SolutionDir>
+ <AvroBinaryDirectory>..\packages\AvroBin</AvroBinaryDirectory>
+ <AvroSchemaDirectory>..\..\common\wake\avro</AvroSchemaDirectory>
+ <AvroTools>..\packages\Microsoft.Avro.Tools.0.1.0\lib\net451\Microsoft.Avro.Tools.exe</AvroTools>
+ <AvroLibrary>..\packages\Microsoft.Avro.Core.0.1.0\lib\net451\Microsoft.Avro.Core.dll</AvroLibrary>
+ <NewtonsoftLibrary>..\packages\Newtonsoft.Json.10.0.3\lib\net45\Newtonsoft.Json.dll</NewtonsoftLibrary>
</PropertyGroup>
<Import Project="$(SolutionDir)\build.props" />
<ItemGroup>
+ <Reference Include="Microsoft.Hadoop.Avro">
+ <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net45\Microsoft.Hadoop.Avro.dll</HintPath>
+ </Reference>
<Reference Include="Microsoft.Practices.TransientFaultHandling.Core, Version=5.1.1209.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>$(PackagesDir)\TransientFaultHandling.Core.5.1.1209.1\lib\NET4\Microsoft.Practices.TransientFaultHandling.Core.dll</HintPath>
</Reference>
@@ -38,6 +46,7 @@
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
+ <Reference Include="System.Data" />
<Reference Include="System.Reactive.Core">
<HintPath>$(PackagesDir)\System.Reactive.Core.$(SystemReactiveVersion)\lib\net45\System.Reactive.Core.dll</HintPath>
<Private>True</Private>
@@ -45,6 +54,7 @@
<Reference Include="System.Reactive.Interfaces">
<HintPath>$(PackagesDir)\System.Reactive.Interfaces.$(SystemReactiveVersion)\lib\net45\System.Reactive.Interfaces.dll</HintPath>
</Reference>
+ <Reference Include="System.Runtime.Serialization" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
@@ -52,6 +62,10 @@
<Link>Properties\SharedAssemblyInfo.cs</Link>
</Compile>
<Compile Include="AbstractEStage.cs" />
+ <Compile Include="Avro\MessageInstance.cs" />
+ <Compile Include="Avro\Message\Header.cs" />
+ <Compile Include="Avro\ProtocolSerializer.cs" />
+ <Compile Include="Avro\SeializationException.cs" />
<Compile Include="Examples\P2p\IEventSource.cs" />
<Compile Include="Examples\P2p\Pull2Push.cs" />
<Compile Include="IEStage.cs" />
@@ -205,5 +219,6 @@
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+ <Import Project="$(SolutionDir)\AvroCodeGeneration.Targets" Condition="Exists('$(SolutionDir)\AvroCodeGeneration.Targets')" />
<Import Project="$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets" Condition="Exists('$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets')" />
</Project>
diff --git a/lang/cs/Org.Apache.REEF.Wake/packages.config b/lang/cs/Org.Apache.REEF.Wake/packages.config
index e176d57..beb1da1 100644
--- a/lang/cs/Org.Apache.REEF.Wake/packages.config
+++ b/lang/cs/Org.Apache.REEF.Wake/packages.config
@@ -18,6 +18,10 @@
under the License.
-->
<packages>
+ <package id="Microsoft.Avro.Core" version="0.1.0" targetFramework="net451" developmentDependency="true" />
+ <package id="Microsoft.Avro.Tools" version="0.1.0" targetFramework="net451" developmentDependency="true" />
+ <package id="Newtonsoft.Json" version="10.0.3" targetFramework="net451" developmentDependency="true" />
+ <package id="Microsoft.Hadoop.Avro" version="1.5.6" targetFramework="net45" />
<package id="protobuf-net" version="2.0.0.668" targetFramework="net45" />
<package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45" developmentDependency="true" />
<package id="System.Reactive.Core" version="3.1.1" targetFramework="net451" />
diff --git a/lang/cs/nuget.config b/lang/cs/nuget.config
index ddd79f7..79317f3 100644
--- a/lang/cs/nuget.config
+++ b/lang/cs/nuget.config
@@ -19,6 +19,7 @@
-->
<configuration>
<packageSources>
+ <add key="nuget.org" value="https://www.nuget.org/api/V2" />
<add key="dotnet" value="https://dotnet.myget.org/F/dotnet-core/api/v3/index.json" />
</packageSources>
</configuration>