[REEF-1938] Implement network transport for the Java-CLR bridge
This PR is for the Java side of the bridge.
JIRA: [REEF-1938](https://issues.apache.org/jira/browse/REEF-1938)
Pull Request
Closes #1405
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/bridge/NetworkTransport.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/bridge/NetworkTransport.java
new file mode 100644
index 0000000..ab0436c
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/bridge/NetworkTransport.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.MultiObserver;
+import org.apache.reef.wake.avro.ProtocolSerializer;
+import org.apache.reef.wake.remote.RemoteMessage;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.impl.ByteCodec;
+import org.apache.reef.wake.remote.impl.SocketRemoteIdentifier;
+import org.apache.reef.wake.remote.RemoteIdentifier;
+import org.apache.reef.wake.remote.RemoteManager;
+import org.apache.reef.wake.remote.RemoteManagerFactory;
+
+import javax.inject.Inject;
+import java.net.InetSocketAddress;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The CLR Bridge Network class aggregates a RemoteManager and
+ * Protocol Serializer to provide a simple send/receive interface
+ * between the Java and CLR sides of the bridge.
+ */
+public final class NetworkTransport {
+
+ private static final Logger LOG = Logger.getLogger(NetworkTransport.class.getName());
+
+ /** Remote manager to handle java-C# bridge communication. */
+ private final RemoteManager remoteManager;
+
+ private final ProtocolSerializer serializer;
+ private final InetSocketAddress inetSocketAddress;
+ private final InjectionFuture<MultiObserver> messageObserver;
+
+ private EventHandler<byte[]> sender;
+
+ /**
+ * Sends and receives messages between the java bridge and C# bridge.
+ * @param observer A multiobserver instance that will receive all incoming messages.
+ */
+ @Inject
+ private NetworkTransport(
+ final RemoteManagerFactory remoteManagerFactory,
+ final LocalAddressProvider localAddressProvider,
+ final ProtocolSerializer serializer,
+ final InjectionFuture<MultiObserver> observer) {
+
+ LOG.log(Level.FINE, "Java bridge network initializing");
+
+ this.serializer = serializer;
+ this.messageObserver = observer;
+
+ this.remoteManager = remoteManagerFactory.getInstance(
+ "JavaBridgeNetwork", localAddressProvider.getLocalAddress(), 0, new ByteCodec());
+
+ // Get our address and port number.
+ final RemoteIdentifier remoteIdentifier = this.remoteManager.getMyIdentifier();
+ if (remoteIdentifier instanceof SocketRemoteIdentifier) {
+ final SocketRemoteIdentifier socketIdentifier = (SocketRemoteIdentifier)remoteIdentifier;
+ this.inetSocketAddress = socketIdentifier.getSocketAddress();
+ } else {
+ throw new RuntimeException("Identifier is not a SocketRemoteIdentifier: " + remoteIdentifier);
+ }
+
+ // Register as the message handler for any incoming messages.
+ this.remoteManager.registerHandler(byte[].class, new LocalObserver());
+ }
+
+ /**
+ * Sends a message to the C# side of the bridge.
+ * @param message An Avro message class derived from SpecificRecord.
+ * @throws RuntimeException if invoked before initialization is complete.
+ */
+ public void send(final long identifier, final SpecificRecord message) {
+ if (sender != null) {
+ sender.onNext(serializer.write(message, identifier));
+ } else {
+ final String msgClassName = message.getClass().getCanonicalName();
+ LOG.log(Level.SEVERE, "Attempt to send message [{0}] before network is initialized", msgClassName);
+ throw new RuntimeException("NetworkTransport not initialized: failed to send " + msgClassName);
+ }
+ }
+
+ /**
+ * Provides the IP address and port of the java bridge network.
+ * @return A InetSockerAddress that contains the ip and port of the bridge network.
+ */
+ public InetSocketAddress getAddress() {
+ return inetSocketAddress;
+ }
+
+ /**
+ * Processes messages from the network remote manager.
+ */
+ private final class LocalObserver implements EventHandler<RemoteMessage<byte[]>> {
+
+ /**
+ * Deserialize and direct incoming messages to the registered MuiltiObserver event handler.
+ * @param message A RemoteMessage<byte[]> object which will be deserialized.
+ */
+ @Override
+ public void onNext(final RemoteMessage<byte[]> message) {
+ LOG.log(Level.FINEST, "Received remote message: {0}", message);
+
+ if (sender == null) {
+ // Instantiate a network connection to the C# side of the bridge.
+ // THERE COULD BE A SECURITY ISSUE HERE WHERE SOMEONE SPOOFS THE
+ // C# BRIDGE, WE RECEIVE IT FIRST, AND CONNECT TO THE SPOOFER,
+ // THOUGH THE TIME WINDOW IS VERY SMALL.
+ final RemoteIdentifier remoteIdentifier = message.getIdentifier();
+ LOG.log(Level.FINE, "Connecting to: {0}", remoteIdentifier);
+ sender = remoteManager.getHandler(remoteIdentifier, byte[].class);
+ }
+
+ // Deserialize the message and invoke the appropriate processing method.
+ serializer.read(message.getMessage(), messageObserver.get());
+ }
+ }
+}
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/bridge/package-info.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/bridge/package-info.java
new file mode 100644
index 0000000..f2c452e
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/bridge/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * The Java-side of the CLR/Java bridge interop.
+ */
+package org.apache.reef.bridge;