[REEF-1819] Add Avro message protocol serializer to Java Side.

   * Added a ProtocolSerializer class to reef.wake which automatically
     construct all of the serializers and deserializers for all of the
     Avro message classes in a given package namespace.
   * Added unit test demonstrate avro message works across java/C# sockets.

JIRA:
  [REEF-1819](https://issues.apache.org/jira/browse/REEF-1819)

Pull Request:
  This closes #1327
diff --git a/lang/common/wake/avro/AvroTestMessage.avsc b/lang/common/wake/avro/AvroTestMessage.avsc
new file mode 100644
index 0000000..403d900
--- /dev/null
+++ b/lang/common/wake/avro/AvroTestMessage.avsc
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+  /*
+   * Identify the next message in the Java/C# bridge protocol.
+   */
+  {
+    "namespace":"org.apache.reef.wake.test.avro.message",
+    "type":"record",
+    "name":"AvroTestMessage",
+    "doc":"Identifies the following message in a given protocol.",
+    "fields":[
+      {
+        "name":"number",
+        "doc":"Integer test field.",
+        "type":"int"
+      },
+      {
+        "name":"data",
+        "doc":"String test field.",
+        "type":"string"
+      }
+    ]
+  }
+]
diff --git a/lang/common/wake/avro/Header.avsc b/lang/common/wake/avro/Header.avsc
index 8efd4f5..b2882b3 100644
--- a/lang/common/wake/avro/Header.avsc
+++ b/lang/common/wake/avro/Header.avsc
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git a/lang/java/reef-wake/wake/pom.xml b/lang/java/reef-wake/wake/pom.xml
index 7f1cfee..cdb82dc 100644
--- a/lang/java/reef-wake/wake/pom.xml
+++ b/lang/java/reef-wake/wake/pom.xml
@@ -52,6 +52,22 @@
         </resources>
 
         <plugins>
+           <plugin>
+              <groupId>org.apache.avro</groupId>
+              <artifactId>avro-maven-plugin</artifactId>
+              <executions>
+                <execution>
+                    <phase>generate-sources</phase>
+                    <goals>
+                        <goal>schema</goal>
+                    </goals>
+                    <configuration>
+                        <sourceDirectory>${rootPath}/lang/common/wake/avro/</sourceDirectory>
+                        <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
+                    </configuration>
+                </execution>
+              </executions>
+            </plugin>
             <plugin>
                 <artifactId>maven-antrun-plugin</artifactId>
                 <executions>
@@ -151,6 +167,15 @@
             <groupId>net.jcip</groupId>
             <artifactId>jcip-annotations</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.github.lukehutch</groupId>
+            <artifactId>fast-classpath-scanner</artifactId>
+            <version>${fast-classpath-scanner.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/MultiObserver.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/MultiObserver.java
new file mode 100644
index 0000000..e310439
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/MultiObserver.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.wake;
+
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * MultiObserver interface definition whose implementation allows the
+ * a single java class to be the recipient of multiple events by simply
+ * defining the methods to handle those events.
+ */
+public interface MultiObserver {
+  /**
+   * Generic event onNext method in the base interface which maps the call to a concrete
+   * event onNext method in TSubCls if one exists otherwise unimplemented is invoked.
+   * @param event An event of type TEvent which will be sent to TSubCls as appropriate.
+   * @param <TEvent> The type of the event being processed.
+   */
+  <TEvent> void onNext(long identifier, TEvent event) throws IllegalAccessException, InvocationTargetException;
+}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/IMessageDeserializer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/IMessageDeserializer.java
new file mode 100644
index 0000000..e918d90
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/IMessageDeserializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.avro;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.reef.wake.MultiObserver;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Base interface for Avro message deserializer objects.
+ */
+public interface IMessageDeserializer {
+  /**
+   * Deserialize messages of type TMessage from input decoder.
+   * @param decoder An Avro BinaryDecoder instance that is reading the input stream containing the message.
+   * @param observer An instance of the MultiObserver class that will process the message.
+   * @param sequence A long value which contains the sequence number of the message in the input stream.
+   * @throws IOException Read of input stream in decoder failed.
+   * @throws IllegalAccessException Target method in observer is not accessible.
+   * @throws InvocationTargetException Subclass threw and exception.
+   */
+  void deserialize(BinaryDecoder decoder, MultiObserver observer, long sequence)
+   throws IOException, IllegalAccessException, InvocationTargetException;
+}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/IMessageSerializer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/IMessageSerializer.java
new file mode 100644
index 0000000..9c1e071
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/IMessageSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.wake.avro;
+
+import org.apache.avro.specific.SpecificRecord;
+
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+
+/**
+ * Base interface for Avro message serializer objects.
+ */
+public interface IMessageSerializer {
+  /**
+   * Deserialize messages of type TMessage from input outputStream.
+   * @param outputStream A ByteArrayOutputStream where the message to
+   *                     be serialized will be written.
+   * @param message An Avro message class which implements the Avro SpcificRecord interface.
+   * @param sequence The numerical position of the message in the outgoing message stream.
+   * @throws IOException An error occurred writing the message to the outputStream.
+   */
+  void serialize(ByteArrayOutputStream outputStream, SpecificRecord message, long sequence)
+    throws IOException;
+}
+
+
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/ProtocolSerializer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/ProtocolSerializer.java
new file mode 100644
index 0000000..ad10d5a
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/ProtocolSerializer.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.wake.avro;
+
+import org.apache.avro.io.*;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.reef.wake.MultiObserver;
+import org.apache.reef.wake.avro.message.Header;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner;
+import io.github.lukehutch.fastclasspathscanner.scanner.ScanResult;
+
+/**
+ * The ProtocolSerializer generates serializers and deserializers for
+ * all of the Avro messages contained in a specified package. The name
+ * of the package must have "message" as the final component of the
+ * package name. For example, Avro messages in the org.foo.me package
+ * would sit in the org.foo.me.messages package.
+ */
+public final class ProtocolSerializer {
+  private static final Logger LOG = Logger.getLogger(ProtocolSerializer.class.getName());
+  // Maps for mapping message class names to serializer and deserializer classes.
+  private final Map<String, IMessageSerializer> nameToSerializerMap = new HashMap<>();
+  private final Map<String, IMessageDeserializer> nameToDeserializerMap = new HashMap<>();
+  private final SpecificDatumReader<Header> headerReader = new SpecificDatumReader<>(Header.class);
+
+  /**
+   * Finds all of the messages in the specified packaged and calls register.
+   * @param messagePackage A string which contains the full name of the
+   *                       package containing the protocol messages.
+   */
+  public ProtocolSerializer(final String messagePackage) {
+    // Build a list of the message reflection classes.
+    final ScanResult scanResult = new FastClasspathScanner(messagePackage).scan();
+    final List<String> scanNames = scanResult.getNamesOfSubclassesOf(SpecificRecordBase.class);
+    final List<Class<?>> messageClasses = scanResult.classNamesToClassRefs(scanNames);
+
+    // Add the header message from the org.apache.reef.wake.avro.message package.
+    messageClasses.add(Header.class);
+
+    try {
+      // Register all of the messages in the specified package.
+      for (final Class<?> cls : messageClasses) {
+        this.register(cls);
+      }
+    } catch (final Exception e) {
+      throw new RuntimeException("Message registration failed", e);
+    }
+  }
+
+  /**
+   * Instantiates and adds a message serializer/deserializer for the message.
+   * @param msgMetaClass The reflection class for the message.
+   * @param <TMessage> The Java type of the message being registered.
+   */
+  public <TMessage> void register(final Class<TMessage> msgMetaClass) {
+    LOG.log(Level.INFO, "Registering message: {0}", msgMetaClass.getSimpleName());
+    nameToSerializerMap.put(msgMetaClass.getSimpleName(), SerializationFactory.createSerializer(msgMetaClass));
+    nameToDeserializerMap.put(msgMetaClass.getSimpleName(), SerializationFactory.createDeserializer(msgMetaClass));
+  }
+
+  /**
+   * Marshall the input message to a byte array.
+   * @param message The message to be marshaled into a byte array.
+   * @param sequence The unique sequence number of the message.
+   */
+  public byte[] write(final SpecificRecord message, final long sequence) {
+    try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+      final String name = message.getClass().getSimpleName();
+      LOG.log(Level.FINE, "Serializing message: {0}", name);
+
+      final IMessageSerializer serializer = nameToSerializerMap.get(name);
+      if (serializer != null) {
+        serializer.serialize(outputStream, message, sequence);
+      }
+
+      return outputStream.toByteArray();
+    } catch (final Exception e) {
+      throw new RuntimeException("Failure writing message: " + message.getClass().getCanonicalName(), e);
+    }
+  }
+
+  /**
+   * Read a message from the input byte stream and send it to the event handler.
+   * @param messageBytes An array of bytes that contains the message to be deserialized.
+   * @param observer An implementation of the MultiObserver interface which will be called
+   *                 to process the deserialized message.
+   */
+  public void read(final byte[] messageBytes, final MultiObserver observer) {
+    try (final InputStream inputStream = new ByteArrayInputStream(messageBytes)) {
+      // Binary decoder for both the header and the message.
+      final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+
+      // Read the header message.
+      final Header header = headerReader.read(null, decoder);
+      LOG.log(Level.FINE, "Deserializing Avro message: {0}", header.getClassName());
+
+      // Get the appropriate deserializer and deserialize the message.
+      final IMessageDeserializer deserializer = nameToDeserializerMap.get(header.getClassName().toString());
+      if (deserializer != null) {
+        deserializer.deserialize(decoder, observer, header.getSequence());
+      } else {
+        throw new RuntimeException("Request to deserialize unknown message type: " +  header.getClassName());
+      }
+
+    } catch (final Exception e) {
+      throw new RuntimeException("Failure reading message: ", e);
+    }
+  }
+}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/SerializationFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/SerializationFactory.java
new file mode 100644
index 0000000..be3031d
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/SerializationFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.wake.avro;
+
+import org.apache.reef.wake.avro.impl.MessageSerializerImpl;
+import org.apache.reef.wake.avro.impl.MessageDeserializerImpl;
+
+/**
+ * Provides Avro message specific message serializers and deserializers.
+ */
+public final class SerializationFactory {
+
+  private SerializationFactory() {}
+
+  /**
+   * Instantiate an Avro message serializer for the message type in the generic parameter.
+   * @param msgMetaClass The reflection class for the message.
+   * @param <TMessage> The type of the Avro message to be serialized.
+   * @return A reference to an IMessageSerializer interface.
+   */
+  public static <TMessage> IMessageSerializer createSerializer(final Class<TMessage> msgMetaClass) {
+    return new MessageSerializerImpl<>(msgMetaClass);
+  }
+
+  /**
+   * Instantiate an Avro message deserializer for the message type in the generic parameter.
+   * @param msgMetaClass The reflection class for the message.
+   * @param <TMessage> The type of the Avro message to be deserialized.
+   * @return A reference to an IMessageDeserializer interface.
+   */
+  public static <TMessage> IMessageDeserializer createDeserializer(final Class<TMessage> msgMetaClass) {
+    return new MessageDeserializerImpl<>(msgMetaClass);
+  }
+}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageDeserializerImpl.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageDeserializerImpl.java
new file mode 100644
index 0000000..55d5716
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageDeserializerImpl.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.wake.avro.impl;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.reef.wake.MultiObserver;
+import org.apache.reef.wake.avro.IMessageDeserializer;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Implementation of Avro message specific deserializer.
+ * @param <TMessage> Java type of the message the instantiation can deserialize.
+ */
+public class MessageDeserializerImpl<TMessage> implements IMessageDeserializer {
+  protected Class<TMessage> msgMetaClass;
+  private final SpecificDatumReader<TMessage> messageReader;
+
+  /**
+   * Initialize message specific deserializer.
+   * @param msgMetaClass The reflection class for the message.
+   * @param msgMetaClass
+   */
+  public MessageDeserializerImpl(final Class<TMessage> msgMetaClass) {
+    this.msgMetaClass = msgMetaClass;
+    this.messageReader = new SpecificDatumReader<>(msgMetaClass);
+  }
+
+  /**
+   * Deserialize messages of type TMessage from input decoder.
+   * @param decoder An Avro BinaryDecoder instance that is reading the input stream containing the message.
+   * @param observer An instance of the MultiObserver class that will process the message.
+   * @param sequence A long value which contains the sequence number of the message in the input stream.
+   * @throws IOException Read of input stream in decoder failed.
+   * @throws IllegalAccessException Target method in observer is not accessible.
+   * @throws InvocationTargetException Subclass threw and exception.
+   */
+  public void deserialize(final BinaryDecoder decoder, final MultiObserver observer, final long sequence)
+    throws IOException, IllegalAccessException, InvocationTargetException {
+
+    final TMessage message = messageReader.read(null, decoder);
+    if (message != null) {
+      observer.onNext(sequence, message);
+    } else {
+      throw new RuntimeException("Failed to deserialize message [" + msgMetaClass.getSimpleName() + "]");
+    }
+  }
+}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageSerializerImpl.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageSerializerImpl.java
new file mode 100644
index 0000000..f225cbb
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageSerializerImpl.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.wake.avro.impl;
+
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.reef.wake.avro.IMessageSerializer;
+import org.apache.reef.wake.avro.message.Header;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/**
+ * Implementation of Avro message specific serializer.
+ * @param <TMessage> Java type of the message the instantiation can serialize.
+ */
+public class MessageSerializerImpl<TMessage> implements IMessageSerializer {
+  private final String msgMetaClassName;
+  // Writers for header and message.
+  private final DatumWriter<Header> headerWriter = new SpecificDatumWriter<>(Header.class);
+  private final DatumWriter<TMessage> messageWriter;
+
+  /**
+   * Initialize message specific serializer.
+   * @param msgMetaClass The reflection class for the message.
+   */
+  public MessageSerializerImpl(final Class<TMessage> msgMetaClass) {
+    this.msgMetaClassName = msgMetaClass.getSimpleName();
+    this.messageWriter = new SpecificDatumWriter<>(msgMetaClass);
+  }
+
+  /**
+   * Deserialize messages of type TMessage from input outputStream.
+   * @param outputStream A ByteArrayOutputStream where the message to
+   *                     be serialized will be written.
+   * @param message An Avro message class which implements the Avro SpcificRecord interface.
+   * @param sequence The numerical position of the message in the outgoing message stream.
+   * @throws IOException An error occurred writing the message to the outputStream.
+   */
+  public void serialize(final ByteArrayOutputStream outputStream,
+                        final SpecificRecord message, final long sequence) throws IOException {
+    // Binary encoder for both the header and message.
+    final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
+    // Write the header and the message.
+    headerWriter.write(new Header(sequence, msgMetaClassName), encoder);
+    messageWriter.write((TMessage)message, encoder);
+    encoder.flush();
+  }
+}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/package-info.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/package-info.java
new file mode 100644
index 0000000..a10a919
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Implementations of serializer and derserializer interfaces.
+ */
+package org.apache.reef.wake.avro.impl;
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/package-info.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/package-info.java
new file mode 100644
index 0000000..9423ed9
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Wake avro serialization classes.
+ */
+package org.apache.reef.wake.avro;
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MultiObserverImpl.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MultiObserverImpl.java
new file mode 100644
index 0000000..fb3935d
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MultiObserverImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.wake.MultiObserver;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The MultiObserverImpl class uses reflection to discover which onNext()
+ * event processing methods are defined and then map events to them.
+ * @param <TSubCls> The subclass derived from MultiObserverImpl.
+ */
+public abstract class MultiObserverImpl<TSubCls> implements MultiObserver {
+  private static final Logger LOG = Logger.getLogger(MultiObserverImpl.class.getName());
+  private final Map<String, Method> methodMap = new HashMap<>();
+
+  /**
+   * Use reflection to discover all of the event processing methods in TSubCls
+   * and setup a means to direct calls from the generic event onNext method defined
+   * in the MultiObserver interface to specific concrete event onNext methods.
+   */
+  public MultiObserverImpl() {
+    // Iterate across the methods and build a hash map of class names to reflection methods.
+    for (final Method method : this.getClass().getMethods()) {
+      if (method.getName().equals("onNext") && method.getDeclaringClass().equals(this.getClass())) {
+        // This is an onNext method defined in TSubCls
+        final Class<?>[] types = method.getParameterTypes();
+        if (types.length == 2 && types[0].equals(Long.TYPE)) {
+          methodMap.put(types[1].getName(), method);
+        }
+      }
+    }
+  }
+
+  /**
+   * Called when an event is received that does not have an onNext method definition
+   * in TSubCls. Override in TSubClas to handle the error.
+   * @param event A reference to an object which is an event not handled by TSubCls.
+   * @param <TEvent> The type of the event being processed.
+   */
+  private <TEvent> void unimplemented(final long identifier, final TEvent event) {
+    LOG.log(Level.INFO, "Unimplemented event: [{0}]: {1}",
+        new String[]{String.valueOf(identifier), event.getClass().getName()});
+  }
+
+  /**
+   * Generic event onNext method in the base interface which maps the call to a concrete
+   * event onNext method in TSubCls if one exists otherwise unimplemented is invoked.
+   * @param event An event of type TEvent which will be sent to TSubCls as appropriate.
+   * @param <TEvent> The type of the event being processed.
+   */
+  @Override
+  public <TEvent> void onNext(final long identifier, final TEvent event)
+    throws IllegalAccessException, InvocationTargetException {
+
+    // Get the reflection method for this call.
+    final Method onNext = methodMap.get(event.getClass().getName());
+    if (onNext != null) {
+      // Process the event.
+      onNext.invoke((TSubCls) this, identifier, event);
+    } else {
+      // Log the unprocessed event.
+      unimplemented(identifier, event);
+    }
+  }
+}
diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/ProtocolSerializerTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/ProtocolSerializerTest.java
new file mode 100644
index 0000000..31d142a
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/ProtocolSerializerTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.test.avro;
+
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.avro.ProtocolSerializer;
+import org.apache.reef.wake.impl.LoggingEventHandler;
+import org.apache.reef.wake.impl.MultiObserverImpl;
+import org.apache.reef.wake.remote.*;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.impl.ByteCodec;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+import org.apache.reef.wake.test.avro.message.AvroTestMessage;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ *  Verify the protocol serializer can serialize and deserialize messages
+ *  exchanged between two remote manager classes.
+ */
+public final class ProtocolSerializerTest {
+  private static final Logger LOG = Logger.getLogger(ProtocolSerializer.class.getName());
+
+  @Rule
+  public final TestName name = new TestName();
+
+  /**
+   * Verify Avro message can be serialized and deserialized
+   * between two remote managers.
+   */
+  @Test
+  public void testProtocolSerializerTest() throws Exception {
+    final int[] numbers = {12, 25};
+    final String[] strings = {"The first string", "The second string"};
+
+    // Queues for storing messages byte messages.
+    final BlockingQueue<byte[]> queue1 = new LinkedBlockingQueue<>();
+    final BlockingQueue<byte[]> queue2 = new LinkedBlockingQueue<>();
+
+    // Remote managers for sending and receiving byte messages.
+    final RemoteManager remoteManager1 = getTestRemoteManager("RemoteManagerOne");
+    final RemoteManager remoteManager2 = getTestRemoteManager("RemoteManagerTwo");
+
+    // Register message handlers for byte level messages.
+    remoteManager1.registerHandler(byte[].class, new ByteMessageObserver(queue1));
+    remoteManager2.registerHandler(byte[].class, new ByteMessageObserver(queue2));
+
+    final EventHandler<byte[]> sender1 = remoteManager1.getHandler(remoteManager2.getMyIdentifier(), byte[].class);
+    final EventHandler<byte[]> sender2 = remoteManager2.getHandler(remoteManager1.getMyIdentifier(), byte[].class);
+
+    final ProtocolSerializer serializer = new ProtocolSerializer("org.apache.reef.wake.test.avro.message");
+
+    sender1.onNext(serializer.write(new AvroTestMessage(numbers[0], strings[0]), 1));
+    sender2.onNext(serializer.write(new AvroTestMessage(numbers[1], strings[1]), 2));
+
+    final AvroMessageObserver avroObserver1 = new AvroMessageObserver();
+    final AvroMessageObserver avroObserver2 = new AvroMessageObserver();
+
+    serializer.read(queue1.take(), avroObserver1);
+    serializer.read(queue2.take(), avroObserver2);
+
+    assertEquals(numbers[0], avroObserver2.getNumber());
+    assertEquals(strings[0], avroObserver2.getDataString());
+
+    assertEquals(numbers[1], avroObserver1.getNumber());
+    assertEquals(strings[1], avroObserver1.getDataString());
+  }
+
+  /**
+   * Build a remote manager on the local IP address with an unused port.
+   * @param identifier The identifier of the remote manager.
+   * @return A RemoteManager instance listing on the local IP address
+   *         with a unique port number.
+   */
+  private RemoteManager getTestRemoteManager(final String identifier) throws InjectionException {
+    final int port = 0;
+    final boolean order = true;
+    final int retries = 3;
+    final int timeOut = 10000;
+
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    final LocalAddressProvider localAddressProvider = injector.getInstance(LocalAddressProvider.class);
+    final TcpPortProvider tcpPortProvider = injector.getInstance(TcpPortProvider.class);
+    final RemoteManagerFactory remoteManagerFactory = injector.getInstance(RemoteManagerFactory.class);
+
+    return remoteManagerFactory.getInstance(
+    identifier, localAddressProvider.getLocalAddress(), port, new ByteCodec(),
+      new LoggingEventHandler<Throwable>(), order, retries, timeOut,
+      localAddressProvider, tcpPortProvider);
+  }
+
+  private final class ByteMessageObserver implements EventHandler<RemoteMessage<byte[]>> {
+    private final BlockingQueue<byte[]> queue;
+
+    /**
+     * @param queue Queue where incoming messages will be stored.
+     */
+    ByteMessageObserver(final BlockingQueue<byte[]> queue) {
+      this.queue = queue;
+    }
+
+    /**
+     * Deserialize and direct incoming messages to the registered MuiltiObserver event handler.
+     * @param message A RemoteMessage<byte[]> object which will be deserialized.
+     */
+    public void onNext(final RemoteMessage<byte[]> message) {
+      queue.add(message.getMessage());
+    }
+  }
+
+  /**
+   * Processes messages from the network remote manager.
+   */
+  public final class AvroMessageObserver extends MultiObserverImpl<AvroMessageObserver> {
+    private int number;
+    private String dataString;
+
+    // Accessors
+    int getNumber() {
+      return number;
+    }
+
+    String getDataString() {
+      return dataString;
+    }
+
+    /**
+     * Processes protocol messages from the C# side of the bridge.
+     * @param identifier A long value which is the unique message identifier.
+     * @param message A reference to the received avro test message.
+     */
+    public void onNext(final long identifier, final AvroTestMessage message) {
+      number = message.getNumber();
+      dataString = message.getData().toString();
+    }
+  }
+}
diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/package-info.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/package-info.java
new file mode 100644
index 0000000..63744c7
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Tests for org.apache.reef.wake.avro package.
+ */
+package org.apache.reef.wake.test.avro;
diff --git a/pom.xml b/pom.xml
index 02ae63f..9d7ee82 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,6 +65,7 @@
         <jsr305.version>3.0.1</jsr305.version>
         <kryo.version>3.0.3</kryo.version>
         <kryo-serializers.version>0.37</kryo-serializers.version>
+        <fast-classpath-scanner.version>2.4.5</fast-classpath-scanner.version>
         <rootPath>${user.dir}</rootPath>
     </properties>