blob: af4df9e5e1dfa25b53043e43abc5317a6b884c44 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using Org.Apache.REEF.Common.Io;
using Org.Apache.REEF.Network.Naming.Codec;
using Org.Apache.REEF.Network.Naming.Events;
using Org.Apache.REEF.Network.Naming.Observers;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Remote.Impl;
using Org.Apache.REEF.Wake.RX;
using Org.Apache.REEF.Wake.RX.Impl;
namespace Org.Apache.REEF.Network.Naming
/// <summary>
/// Service that manages names and IPEndpoints for well known hosts.
/// Can register, unregister, and look up IPAddresses using a string identifier.
/// </summary>
public sealed class NameServer : INameServer
private static readonly Logger Logger = Logger.GetLogger(typeof(NameServer));
private readonly TransportServer<NamingEvent> _server;
private readonly Dictionary<string, IPEndPoint> _idToAddrMap;
/// <summary>
/// Create a new NameServer to run on the specified port.
/// </summary>
/// <param name="port">The port to listen for incoming connections on.</param>
/// <param name="addressProvider">The address provider.</param>
/// <param name="tcpPortProvider">If port is 0, this interface provides
/// a port range to try.
/// </param>
private NameServer(
[Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int port,
ILocalAddressProvider addressProvider,
ITcpPortProvider tcpPortProvider)
IObserver<TransportEvent<NamingEvent>> handler = CreateServerHandler();
_idToAddrMap = new Dictionary<string, IPEndPoint>();
ICodec<NamingEvent> codec = CreateServerCodec();
// Start transport server, get listening IP endpoint
Logger.Log(Level.Info, "Starting naming server");
_server = new TransportServer<NamingEvent>(
new IPEndPoint(addressProvider.LocalAddress, port), handler,
codec, tcpPortProvider);
LocalEndpoint = _server.LocalEndpoint;
public IPEndPoint LocalEndpoint { get; private set; }
/// <summary>
/// Looks up the IPEndpoints for each string identifier
/// </summary>
/// <param name="ids">The IDs to look up</param>
/// <returns>A list of Name assignments representing the identifier
/// that was searched for and the mapped IPEndpoint</returns>
public List<NameAssignment> Lookup(List<string> ids)
if (ids == null)
Exceptions.Throw(new ArgumentNullException("ids"), Logger);
return ids.Where(id => _idToAddrMap.ContainsKey(id))
.Select(id => new NameAssignment(id, _idToAddrMap[id]))
/// <summary>
/// Gets all of the registered identifier/endpoint pairs.
/// </summary>
/// <returns>A list of all of the registered identifiers and their
/// mapped IPEndpoints</returns>
public List<NameAssignment> GetAll()
return _idToAddrMap.Select(pair => new NameAssignment(pair.Key, pair.Value)).ToList();
/// <summary>
/// Registers the string identifier with the given IPEndpoint
/// </summary>
/// <param name="id">The string ident</param>
/// <param name="endpoint">The mapped endpoint</param>
public void Register(string id, IPEndPoint endpoint)
if (id == null)
Exceptions.Throw(new ArgumentNullException("id"), Logger);
if (endpoint == null)
Exceptions.Throw(new ArgumentNullException("endpoint"), Logger);
Logger.Log(Level.Info, "Registering id: " + id + ", and endpoint: " + endpoint);
_idToAddrMap[id] = endpoint;
/// <summary>
/// Unregister the given identifier with the NameServer
/// </summary>
/// <param name="id">The identifier to unregister</param>
public void Unregister(string id)
if (id == null)
Exceptions.Throw(new ArgumentNullException("id"), Logger);
Logger.Log(Level.Info, "Unregistering id: " + id);
/// <summary>
/// Stops the NameServer
/// </summary>
public void Dispose()
/// <summary>
/// Create the handler to manage incoming NamingEvent types
/// </summary>
/// <returns>The server handler</returns>
private IObserver<TransportEvent<NamingEvent>> CreateServerHandler()
PubSubSubject<NamingEvent> subject = new PubSubSubject<NamingEvent>();
subject.Subscribe(new NamingLookupRequestObserver(this));
subject.Subscribe(new NamingGetAllRequestObserver(this));
subject.Subscribe(new NamingRegisterRequestObserver(this));
subject.Subscribe(new NamingUnregisterRequestObserver(this));
return new ServerHandler(subject);
/// <summary>
/// Create the codec used to serialize/deserialize NamingEvent messages
/// </summary>
/// <returns>The serialization codec</returns>
private ICodec<NamingEvent> CreateServerCodec()
MultiCodec<NamingEvent> codec = new MultiCodec<NamingEvent>();
codec.Register(new NamingLookupRequestCodec(), "");
codec.Register(new NamingLookupResponseCodec(), "");
NamingRegisterRequestCodec requestCodec = new NamingRegisterRequestCodec();
codec.Register(requestCodec, "");
codec.Register(new NamingRegisterResponseCodec(requestCodec), "");
codec.Register(new NamingUnregisterRequestCodec(), "");
return codec;
[NamedParameter("Port for the NameServer to listen on")]
public class Port : Name<int>
/// <summary>
/// Class used to handle incoming NamingEvent messages.
/// Delegates the event to the prescribed handler depending on its type
/// </summary>
private class ServerHandler : AbstractObserver<TransportEvent<NamingEvent>>
private readonly IObserver<NamingEvent> _handler;
public ServerHandler(IObserver<NamingEvent> handler)
_handler = handler;
public override void OnNext(TransportEvent<NamingEvent> value)
if (value != null && value.Data != null)
NamingEvent message = value.Data;
message.Link = value.Link;
Logger.Log(Level.Warning, "NameServer.OnNext(TransportEvent<NamingEvent> value), message received is null");