[REEF-1819] Add Avro message protocol serializer to Java Side.
* Added a ProtocolSerializer class to reef.wake which automatically
construct all of the serializers and deserializers for all of the
Avro message classes in a given package namespace.
* Added unit test demonstrate avro message works across java/C# sockets.
JIRA:
[REEF-1819](https://issues.apache.org/jira/browse/REEF-1819)
Pull Request:
This closes #1327
diff --git a/lang/common/wake/avro/AvroTestMessage.avsc b/lang/common/wake/avro/AvroTestMessage.avsc
new file mode 100644
index 0000000..403d900
--- /dev/null
+++ b/lang/common/wake/avro/AvroTestMessage.avsc
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+ /*
+ * Identify the next message in the Java/C# bridge protocol.
+ */
+ {
+ "namespace":"org.apache.reef.wake.test.avro.message",
+ "type":"record",
+ "name":"AvroTestMessage",
+ "doc":"Identifies the following message in a given protocol.",
+ "fields":[
+ {
+ "name":"number",
+ "doc":"Integer test field.",
+ "type":"int"
+ },
+ {
+ "name":"data",
+ "doc":"String test field.",
+ "type":"string"
+ }
+ ]
+ }
+]
diff --git a/lang/common/wake/avro/Header.avsc b/lang/common/wake/avro/Header.avsc
index 8efd4f5..b2882b3 100644
--- a/lang/common/wake/avro/Header.avsc
+++ b/lang/common/wake/avro/Header.avsc
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/lang/java/reef-wake/wake/pom.xml b/lang/java/reef-wake/wake/pom.xml
index 7f1cfee..cdb82dc 100644
--- a/lang/java/reef-wake/wake/pom.xml
+++ b/lang/java/reef-wake/wake/pom.xml
@@ -52,6 +52,22 @@
</resources>
<plugins>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ <configuration>
+ <sourceDirectory>${rootPath}/lang/common/wake/avro/</sourceDirectory>
+ <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
@@ -151,6 +167,15 @@
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.github.lukehutch</groupId>
+ <artifactId>fast-classpath-scanner</artifactId>
+ <version>${fast-classpath-scanner.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/MultiObserver.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/MultiObserver.java
new file mode 100644
index 0000000..e310439
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/MultiObserver.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.wake;
+
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * MultiObserver interface definition whose implementation allows the
+ * a single java class to be the recipient of multiple events by simply
+ * defining the methods to handle those events.
+ */
+public interface MultiObserver {
+ /**
+ * Generic event onNext method in the base interface which maps the call to a concrete
+ * event onNext method in TSubCls if one exists otherwise unimplemented is invoked.
+ * @param event An event of type TEvent which will be sent to TSubCls as appropriate.
+ * @param <TEvent> The type of the event being processed.
+ */
+ <TEvent> void onNext(long identifier, TEvent event) throws IllegalAccessException, InvocationTargetException;
+}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/IMessageDeserializer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/IMessageDeserializer.java
new file mode 100644
index 0000000..e918d90
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/IMessageDeserializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.avro;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.reef.wake.MultiObserver;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Base interface for Avro message deserializer objects.
+ */
+public interface IMessageDeserializer {
+ /**
+ * Deserialize messages of type TMessage from input decoder.
+ * @param decoder An Avro BinaryDecoder instance that is reading the input stream containing the message.
+ * @param observer An instance of the MultiObserver class that will process the message.
+ * @param sequence A long value which contains the sequence number of the message in the input stream.
+ * @throws IOException Read of input stream in decoder failed.
+ * @throws IllegalAccessException Target method in observer is not accessible.
+ * @throws InvocationTargetException Subclass threw and exception.
+ */
+ void deserialize(BinaryDecoder decoder, MultiObserver observer, long sequence)
+ throws IOException, IllegalAccessException, InvocationTargetException;
+}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/IMessageSerializer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/IMessageSerializer.java
new file mode 100644
index 0000000..9c1e071
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/IMessageSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.wake.avro;
+
+import org.apache.avro.specific.SpecificRecord;
+
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+
+/**
+ * Base interface for Avro message serializer objects.
+ */
+public interface IMessageSerializer {
+ /**
+ * Deserialize messages of type TMessage from input outputStream.
+ * @param outputStream A ByteArrayOutputStream where the message to
+ * be serialized will be written.
+ * @param message An Avro message class which implements the Avro SpcificRecord interface.
+ * @param sequence The numerical position of the message in the outgoing message stream.
+ * @throws IOException An error occurred writing the message to the outputStream.
+ */
+ void serialize(ByteArrayOutputStream outputStream, SpecificRecord message, long sequence)
+ throws IOException;
+}
+
+
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/ProtocolSerializer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/ProtocolSerializer.java
new file mode 100644
index 0000000..ad10d5a
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/ProtocolSerializer.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.wake.avro;
+
+import org.apache.avro.io.*;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.reef.wake.MultiObserver;
+import org.apache.reef.wake.avro.message.Header;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner;
+import io.github.lukehutch.fastclasspathscanner.scanner.ScanResult;
+
+/**
+ * The ProtocolSerializer generates serializers and deserializers for
+ * all of the Avro messages contained in a specified package. The name
+ * of the package must have "message" as the final component of the
+ * package name. For example, Avro messages in the org.foo.me package
+ * would sit in the org.foo.me.messages package.
+ */
+public final class ProtocolSerializer {
+ private static final Logger LOG = Logger.getLogger(ProtocolSerializer.class.getName());
+ // Maps for mapping message class names to serializer and deserializer classes.
+ private final Map<String, IMessageSerializer> nameToSerializerMap = new HashMap<>();
+ private final Map<String, IMessageDeserializer> nameToDeserializerMap = new HashMap<>();
+ private final SpecificDatumReader<Header> headerReader = new SpecificDatumReader<>(Header.class);
+
+ /**
+ * Finds all of the messages in the specified packaged and calls register.
+ * @param messagePackage A string which contains the full name of the
+ * package containing the protocol messages.
+ */
+ public ProtocolSerializer(final String messagePackage) {
+ // Build a list of the message reflection classes.
+ final ScanResult scanResult = new FastClasspathScanner(messagePackage).scan();
+ final List<String> scanNames = scanResult.getNamesOfSubclassesOf(SpecificRecordBase.class);
+ final List<Class<?>> messageClasses = scanResult.classNamesToClassRefs(scanNames);
+
+ // Add the header message from the org.apache.reef.wake.avro.message package.
+ messageClasses.add(Header.class);
+
+ try {
+ // Register all of the messages in the specified package.
+ for (final Class<?> cls : messageClasses) {
+ this.register(cls);
+ }
+ } catch (final Exception e) {
+ throw new RuntimeException("Message registration failed", e);
+ }
+ }
+
+ /**
+ * Instantiates and adds a message serializer/deserializer for the message.
+ * @param msgMetaClass The reflection class for the message.
+ * @param <TMessage> The Java type of the message being registered.
+ */
+ public <TMessage> void register(final Class<TMessage> msgMetaClass) {
+ LOG.log(Level.INFO, "Registering message: {0}", msgMetaClass.getSimpleName());
+ nameToSerializerMap.put(msgMetaClass.getSimpleName(), SerializationFactory.createSerializer(msgMetaClass));
+ nameToDeserializerMap.put(msgMetaClass.getSimpleName(), SerializationFactory.createDeserializer(msgMetaClass));
+ }
+
+ /**
+ * Marshall the input message to a byte array.
+ * @param message The message to be marshaled into a byte array.
+ * @param sequence The unique sequence number of the message.
+ */
+ public byte[] write(final SpecificRecord message, final long sequence) {
+ try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+ final String name = message.getClass().getSimpleName();
+ LOG.log(Level.FINE, "Serializing message: {0}", name);
+
+ final IMessageSerializer serializer = nameToSerializerMap.get(name);
+ if (serializer != null) {
+ serializer.serialize(outputStream, message, sequence);
+ }
+
+ return outputStream.toByteArray();
+ } catch (final Exception e) {
+ throw new RuntimeException("Failure writing message: " + message.getClass().getCanonicalName(), e);
+ }
+ }
+
+ /**
+ * Read a message from the input byte stream and send it to the event handler.
+ * @param messageBytes An array of bytes that contains the message to be deserialized.
+ * @param observer An implementation of the MultiObserver interface which will be called
+ * to process the deserialized message.
+ */
+ public void read(final byte[] messageBytes, final MultiObserver observer) {
+ try (final InputStream inputStream = new ByteArrayInputStream(messageBytes)) {
+ // Binary decoder for both the header and the message.
+ final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+
+ // Read the header message.
+ final Header header = headerReader.read(null, decoder);
+ LOG.log(Level.FINE, "Deserializing Avro message: {0}", header.getClassName());
+
+ // Get the appropriate deserializer and deserialize the message.
+ final IMessageDeserializer deserializer = nameToDeserializerMap.get(header.getClassName().toString());
+ if (deserializer != null) {
+ deserializer.deserialize(decoder, observer, header.getSequence());
+ } else {
+ throw new RuntimeException("Request to deserialize unknown message type: " + header.getClassName());
+ }
+
+ } catch (final Exception e) {
+ throw new RuntimeException("Failure reading message: ", e);
+ }
+ }
+}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/SerializationFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/SerializationFactory.java
new file mode 100644
index 0000000..be3031d
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/SerializationFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.wake.avro;
+
+import org.apache.reef.wake.avro.impl.MessageSerializerImpl;
+import org.apache.reef.wake.avro.impl.MessageDeserializerImpl;
+
+/**
+ * Provides Avro message specific message serializers and deserializers.
+ */
+public final class SerializationFactory {
+
+ private SerializationFactory() {}
+
+ /**
+ * Instantiate an Avro message serializer for the message type in the generic parameter.
+ * @param msgMetaClass The reflection class for the message.
+ * @param <TMessage> The type of the Avro message to be serialized.
+ * @return A reference to an IMessageSerializer interface.
+ */
+ public static <TMessage> IMessageSerializer createSerializer(final Class<TMessage> msgMetaClass) {
+ return new MessageSerializerImpl<>(msgMetaClass);
+ }
+
+ /**
+ * Instantiate an Avro message deserializer for the message type in the generic parameter.
+ * @param msgMetaClass The reflection class for the message.
+ * @param <TMessage> The type of the Avro message to be deserialized.
+ * @return A reference to an IMessageDeserializer interface.
+ */
+ public static <TMessage> IMessageDeserializer createDeserializer(final Class<TMessage> msgMetaClass) {
+ return new MessageDeserializerImpl<>(msgMetaClass);
+ }
+}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageDeserializerImpl.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageDeserializerImpl.java
new file mode 100644
index 0000000..55d5716
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageDeserializerImpl.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.wake.avro.impl;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.reef.wake.MultiObserver;
+import org.apache.reef.wake.avro.IMessageDeserializer;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Implementation of Avro message specific deserializer.
+ * @param <TMessage> Java type of the message the instantiation can deserialize.
+ */
+public class MessageDeserializerImpl<TMessage> implements IMessageDeserializer {
+ protected Class<TMessage> msgMetaClass;
+ private final SpecificDatumReader<TMessage> messageReader;
+
+ /**
+ * Initialize message specific deserializer.
+ * @param msgMetaClass The reflection class for the message.
+ * @param msgMetaClass
+ */
+ public MessageDeserializerImpl(final Class<TMessage> msgMetaClass) {
+ this.msgMetaClass = msgMetaClass;
+ this.messageReader = new SpecificDatumReader<>(msgMetaClass);
+ }
+
+ /**
+ * Deserialize messages of type TMessage from input decoder.
+ * @param decoder An Avro BinaryDecoder instance that is reading the input stream containing the message.
+ * @param observer An instance of the MultiObserver class that will process the message.
+ * @param sequence A long value which contains the sequence number of the message in the input stream.
+ * @throws IOException Read of input stream in decoder failed.
+ * @throws IllegalAccessException Target method in observer is not accessible.
+ * @throws InvocationTargetException Subclass threw and exception.
+ */
+ public void deserialize(final BinaryDecoder decoder, final MultiObserver observer, final long sequence)
+ throws IOException, IllegalAccessException, InvocationTargetException {
+
+ final TMessage message = messageReader.read(null, decoder);
+ if (message != null) {
+ observer.onNext(sequence, message);
+ } else {
+ throw new RuntimeException("Failed to deserialize message [" + msgMetaClass.getSimpleName() + "]");
+ }
+ }
+}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageSerializerImpl.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageSerializerImpl.java
new file mode 100644
index 0000000..f225cbb
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageSerializerImpl.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.wake.avro.impl;
+
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.reef.wake.avro.IMessageSerializer;
+import org.apache.reef.wake.avro.message.Header;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/**
+ * Implementation of Avro message specific serializer.
+ * @param <TMessage> Java type of the message the instantiation can serialize.
+ */
+public class MessageSerializerImpl<TMessage> implements IMessageSerializer {
+ private final String msgMetaClassName;
+ // Writers for header and message.
+ private final DatumWriter<Header> headerWriter = new SpecificDatumWriter<>(Header.class);
+ private final DatumWriter<TMessage> messageWriter;
+
+ /**
+ * Initialize message specific serializer.
+ * @param msgMetaClass The reflection class for the message.
+ */
+ public MessageSerializerImpl(final Class<TMessage> msgMetaClass) {
+ this.msgMetaClassName = msgMetaClass.getSimpleName();
+ this.messageWriter = new SpecificDatumWriter<>(msgMetaClass);
+ }
+
+ /**
+ * Deserialize messages of type TMessage from input outputStream.
+ * @param outputStream A ByteArrayOutputStream where the message to
+ * be serialized will be written.
+ * @param message An Avro message class which implements the Avro SpcificRecord interface.
+ * @param sequence The numerical position of the message in the outgoing message stream.
+ * @throws IOException An error occurred writing the message to the outputStream.
+ */
+ public void serialize(final ByteArrayOutputStream outputStream,
+ final SpecificRecord message, final long sequence) throws IOException {
+ // Binary encoder for both the header and message.
+ final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
+ // Write the header and the message.
+ headerWriter.write(new Header(sequence, msgMetaClassName), encoder);
+ messageWriter.write((TMessage)message, encoder);
+ encoder.flush();
+ }
+}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/package-info.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/package-info.java
new file mode 100644
index 0000000..a10a919
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Implementations of serializer and derserializer interfaces.
+ */
+package org.apache.reef.wake.avro.impl;
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/package-info.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/package-info.java
new file mode 100644
index 0000000..9423ed9
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Wake avro serialization classes.
+ */
+package org.apache.reef.wake.avro;
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MultiObserverImpl.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MultiObserverImpl.java
new file mode 100644
index 0000000..fb3935d
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MultiObserverImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.wake.MultiObserver;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The MultiObserverImpl class uses reflection to discover which onNext()
+ * event processing methods are defined and then map events to them.
+ * @param <TSubCls> The subclass derived from MultiObserverImpl.
+ */
+public abstract class MultiObserverImpl<TSubCls> implements MultiObserver {
+ private static final Logger LOG = Logger.getLogger(MultiObserverImpl.class.getName());
+ private final Map<String, Method> methodMap = new HashMap<>();
+
+ /**
+ * Use reflection to discover all of the event processing methods in TSubCls
+ * and setup a means to direct calls from the generic event onNext method defined
+ * in the MultiObserver interface to specific concrete event onNext methods.
+ */
+ public MultiObserverImpl() {
+ // Iterate across the methods and build a hash map of class names to reflection methods.
+ for (final Method method : this.getClass().getMethods()) {
+ if (method.getName().equals("onNext") && method.getDeclaringClass().equals(this.getClass())) {
+ // This is an onNext method defined in TSubCls
+ final Class<?>[] types = method.getParameterTypes();
+ if (types.length == 2 && types[0].equals(Long.TYPE)) {
+ methodMap.put(types[1].getName(), method);
+ }
+ }
+ }
+ }
+
+ /**
+ * Called when an event is received that does not have an onNext method definition
+ * in TSubCls. Override in TSubClas to handle the error.
+ * @param event A reference to an object which is an event not handled by TSubCls.
+ * @param <TEvent> The type of the event being processed.
+ */
+ private <TEvent> void unimplemented(final long identifier, final TEvent event) {
+ LOG.log(Level.INFO, "Unimplemented event: [{0}]: {1}",
+ new String[]{String.valueOf(identifier), event.getClass().getName()});
+ }
+
+ /**
+ * Generic event onNext method in the base interface which maps the call to a concrete
+ * event onNext method in TSubCls if one exists otherwise unimplemented is invoked.
+ * @param event An event of type TEvent which will be sent to TSubCls as appropriate.
+ * @param <TEvent> The type of the event being processed.
+ */
+ @Override
+ public <TEvent> void onNext(final long identifier, final TEvent event)
+ throws IllegalAccessException, InvocationTargetException {
+
+ // Get the reflection method for this call.
+ final Method onNext = methodMap.get(event.getClass().getName());
+ if (onNext != null) {
+ // Process the event.
+ onNext.invoke((TSubCls) this, identifier, event);
+ } else {
+ // Log the unprocessed event.
+ unimplemented(identifier, event);
+ }
+ }
+}
diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/ProtocolSerializerTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/ProtocolSerializerTest.java
new file mode 100644
index 0000000..31d142a
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/ProtocolSerializerTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.test.avro;
+
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.avro.ProtocolSerializer;
+import org.apache.reef.wake.impl.LoggingEventHandler;
+import org.apache.reef.wake.impl.MultiObserverImpl;
+import org.apache.reef.wake.remote.*;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.impl.ByteCodec;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+import org.apache.reef.wake.test.avro.message.AvroTestMessage;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Logger;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Verify the protocol serializer can serialize and deserialize messages
+ * exchanged between two remote manager classes.
+ */
+public final class ProtocolSerializerTest {
+ private static final Logger LOG = Logger.getLogger(ProtocolSerializer.class.getName());
+
+ @Rule
+ public final TestName name = new TestName();
+
+ /**
+ * Verify Avro message can be serialized and deserialized
+ * between two remote managers.
+ */
+ @Test
+ public void testProtocolSerializerTest() throws Exception {
+ final int[] numbers = {12, 25};
+ final String[] strings = {"The first string", "The second string"};
+
+ // Queues for storing messages byte messages.
+ final BlockingQueue<byte[]> queue1 = new LinkedBlockingQueue<>();
+ final BlockingQueue<byte[]> queue2 = new LinkedBlockingQueue<>();
+
+ // Remote managers for sending and receiving byte messages.
+ final RemoteManager remoteManager1 = getTestRemoteManager("RemoteManagerOne");
+ final RemoteManager remoteManager2 = getTestRemoteManager("RemoteManagerTwo");
+
+ // Register message handlers for byte level messages.
+ remoteManager1.registerHandler(byte[].class, new ByteMessageObserver(queue1));
+ remoteManager2.registerHandler(byte[].class, new ByteMessageObserver(queue2));
+
+ final EventHandler<byte[]> sender1 = remoteManager1.getHandler(remoteManager2.getMyIdentifier(), byte[].class);
+ final EventHandler<byte[]> sender2 = remoteManager2.getHandler(remoteManager1.getMyIdentifier(), byte[].class);
+
+ final ProtocolSerializer serializer = new ProtocolSerializer("org.apache.reef.wake.test.avro.message");
+
+ sender1.onNext(serializer.write(new AvroTestMessage(numbers[0], strings[0]), 1));
+ sender2.onNext(serializer.write(new AvroTestMessage(numbers[1], strings[1]), 2));
+
+ final AvroMessageObserver avroObserver1 = new AvroMessageObserver();
+ final AvroMessageObserver avroObserver2 = new AvroMessageObserver();
+
+ serializer.read(queue1.take(), avroObserver1);
+ serializer.read(queue2.take(), avroObserver2);
+
+ assertEquals(numbers[0], avroObserver2.getNumber());
+ assertEquals(strings[0], avroObserver2.getDataString());
+
+ assertEquals(numbers[1], avroObserver1.getNumber());
+ assertEquals(strings[1], avroObserver1.getDataString());
+ }
+
+ /**
+ * Build a remote manager on the local IP address with an unused port.
+ * @param identifier The identifier of the remote manager.
+ * @return A RemoteManager instance listing on the local IP address
+ * with a unique port number.
+ */
+ private RemoteManager getTestRemoteManager(final String identifier) throws InjectionException {
+ final int port = 0;
+ final boolean order = true;
+ final int retries = 3;
+ final int timeOut = 10000;
+
+ final Injector injector = Tang.Factory.getTang().newInjector();
+ final LocalAddressProvider localAddressProvider = injector.getInstance(LocalAddressProvider.class);
+ final TcpPortProvider tcpPortProvider = injector.getInstance(TcpPortProvider.class);
+ final RemoteManagerFactory remoteManagerFactory = injector.getInstance(RemoteManagerFactory.class);
+
+ return remoteManagerFactory.getInstance(
+ identifier, localAddressProvider.getLocalAddress(), port, new ByteCodec(),
+ new LoggingEventHandler<Throwable>(), order, retries, timeOut,
+ localAddressProvider, tcpPortProvider);
+ }
+
+ private final class ByteMessageObserver implements EventHandler<RemoteMessage<byte[]>> {
+ private final BlockingQueue<byte[]> queue;
+
+ /**
+ * @param queue Queue where incoming messages will be stored.
+ */
+ ByteMessageObserver(final BlockingQueue<byte[]> queue) {
+ this.queue = queue;
+ }
+
+ /**
+ * Deserialize and direct incoming messages to the registered MuiltiObserver event handler.
+ * @param message A RemoteMessage<byte[]> object which will be deserialized.
+ */
+ public void onNext(final RemoteMessage<byte[]> message) {
+ queue.add(message.getMessage());
+ }
+ }
+
+ /**
+ * Processes messages from the network remote manager.
+ */
+ public final class AvroMessageObserver extends MultiObserverImpl<AvroMessageObserver> {
+ private int number;
+ private String dataString;
+
+ // Accessors
+ int getNumber() {
+ return number;
+ }
+
+ String getDataString() {
+ return dataString;
+ }
+
+ /**
+ * Processes protocol messages from the C# side of the bridge.
+ * @param identifier A long value which is the unique message identifier.
+ * @param message A reference to the received avro test message.
+ */
+ public void onNext(final long identifier, final AvroTestMessage message) {
+ number = message.getNumber();
+ dataString = message.getData().toString();
+ }
+ }
+}
diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/package-info.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/package-info.java
new file mode 100644
index 0000000..63744c7
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Tests for org.apache.reef.wake.avro package.
+ */
+package org.apache.reef.wake.test.avro;
diff --git a/pom.xml b/pom.xml
index 02ae63f..9d7ee82 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,6 +65,7 @@
<jsr305.version>3.0.1</jsr305.version>
<kryo.version>3.0.3</kryo.version>
<kryo-serializers.version>0.37</kryo-serializers.version>
+ <fast-classpath-scanner.version>2.4.5</fast-classpath-scanner.version>
<rootPath>${user.dir}</rootPath>
</properties>