blob: ab0436c2cf895d3deabb3d4e2c2726143af38d8a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.bridge;
import org.apache.avro.specific.SpecificRecord;
import org.apache.reef.tang.InjectionFuture;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.MultiObserver;
import org.apache.reef.wake.avro.ProtocolSerializer;
import org.apache.reef.wake.remote.RemoteMessage;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.apache.reef.wake.remote.impl.ByteCodec;
import org.apache.reef.wake.remote.impl.SocketRemoteIdentifier;
import org.apache.reef.wake.remote.RemoteIdentifier;
import org.apache.reef.wake.remote.RemoteManager;
import org.apache.reef.wake.remote.RemoteManagerFactory;
import javax.inject.Inject;
import java.net.InetSocketAddress;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* The CLR Bridge Network class aggregates a RemoteManager and
* Protocol Serializer to provide a simple send/receive interface
* between the Java and CLR sides of the bridge.
*/
public final class NetworkTransport {
private static final Logger LOG = Logger.getLogger(NetworkTransport.class.getName());
/** Remote manager to handle java-C# bridge communication. */
private final RemoteManager remoteManager;
private final ProtocolSerializer serializer;
private final InetSocketAddress inetSocketAddress;
private final InjectionFuture<MultiObserver> messageObserver;
private EventHandler<byte[]> sender;
/**
* Sends and receives messages between the java bridge and C# bridge.
* @param observer A multiobserver instance that will receive all incoming messages.
*/
@Inject
private NetworkTransport(
final RemoteManagerFactory remoteManagerFactory,
final LocalAddressProvider localAddressProvider,
final ProtocolSerializer serializer,
final InjectionFuture<MultiObserver> observer) {
LOG.log(Level.FINE, "Java bridge network initializing");
this.serializer = serializer;
this.messageObserver = observer;
this.remoteManager = remoteManagerFactory.getInstance(
"JavaBridgeNetwork", localAddressProvider.getLocalAddress(), 0, new ByteCodec());
// Get our address and port number.
final RemoteIdentifier remoteIdentifier = this.remoteManager.getMyIdentifier();
if (remoteIdentifier instanceof SocketRemoteIdentifier) {
final SocketRemoteIdentifier socketIdentifier = (SocketRemoteIdentifier)remoteIdentifier;
this.inetSocketAddress = socketIdentifier.getSocketAddress();
} else {
throw new RuntimeException("Identifier is not a SocketRemoteIdentifier: " + remoteIdentifier);
}
// Register as the message handler for any incoming messages.
this.remoteManager.registerHandler(byte[].class, new LocalObserver());
}
/**
* Sends a message to the C# side of the bridge.
* @param message An Avro message class derived from SpecificRecord.
* @throws RuntimeException if invoked before initialization is complete.
*/
public void send(final long identifier, final SpecificRecord message) {
if (sender != null) {
sender.onNext(serializer.write(message, identifier));
} else {
final String msgClassName = message.getClass().getCanonicalName();
LOG.log(Level.SEVERE, "Attempt to send message [{0}] before network is initialized", msgClassName);
throw new RuntimeException("NetworkTransport not initialized: failed to send " + msgClassName);
}
}
/**
* Provides the IP address and port of the java bridge network.
* @return A InetSockerAddress that contains the ip and port of the bridge network.
*/
public InetSocketAddress getAddress() {
return inetSocketAddress;
}
/**
* Processes messages from the network remote manager.
*/
private final class LocalObserver implements EventHandler<RemoteMessage<byte[]>> {
/**
* Deserialize and direct incoming messages to the registered MuiltiObserver event handler.
* @param message A RemoteMessage<byte[]> object which will be deserialized.
*/
@Override
public void onNext(final RemoteMessage<byte[]> message) {
LOG.log(Level.FINEST, "Received remote message: {0}", message);
if (sender == null) {
// Instantiate a network connection to the C# side of the bridge.
// THERE COULD BE A SECURITY ISSUE HERE WHERE SOMEONE SPOOFS THE
// C# BRIDGE, WE RECEIVE IT FIRST, AND CONNECT TO THE SPOOFER,
// THOUGH THE TIME WINDOW IS VERY SMALL.
final RemoteIdentifier remoteIdentifier = message.getIdentifier();
LOG.log(Level.FINE, "Connecting to: {0}", remoteIdentifier);
sender = remoteManager.getHandler(remoteIdentifier, byte[].class);
}
// Deserialize the message and invoke the appropriate processing method.
serializer.read(message.getMessage(), messageObserver.get());
}
}
}