| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.gearpump.transport |
| |
| import scala.collection.immutable.LongMap |
| import scala.concurrent._ |
| |
| import akka.actor._ |
| import akka.agent.Agent |
| import org.slf4j.Logger |
| |
| import org.apache.gearpump.transport.netty.Client.Close |
| import org.apache.gearpump.transport.netty.{TaskMessage, Context} |
| import org.apache.gearpump.util.LogUtil |
| |
| trait ActorLookupById { |
| |
| /** Lookup actor ref for local task actor by providing a TaskId (TaskId.toLong) */ |
| def lookupLocalActor(id: Long): Option[ActorRef] |
| } |
| |
| /** |
| * Custom networking layer. |
| * |
| * It will translate long sender/receiver address to shorter ones to reduce |
| * the network overhead. |
| */ |
| class Express(val system: ExtendedActorSystem) extends Extension with ActorLookupById { |
| |
| import system.dispatcher |
| |
| import org.apache.gearpump.transport.Express._ |
| val localActorMap = Agent(LongMap.empty[ActorRef]) |
| val remoteAddressMap = Agent(Map.empty[Long, HostPort]) |
| |
| val remoteClientMap = Agent(Map.empty[HostPort, ActorRef]) |
| |
| val conf = system.settings.config |
| |
| lazy val (context, serverPort, localHost) = init |
| |
| lazy val init = { |
| LOG.info(s"Start Express init ...${system.name}") |
| val context = new Context(system, conf) |
| val serverPort = context.bind("netty-server", this) |
| val localHost = HostPort(system.provider.getDefaultAddress.host.get, serverPort) |
| LOG.info(s"binding to netty server $localHost") |
| |
| system.registerOnTermination(new Runnable { |
| override def run(): Unit = context.close() |
| }) |
| (context, serverPort, localHost) |
| } |
| |
| def unregisterLocalActor(id: Long): Unit = { |
| localActorMap.sendOff(_ - id) |
| } |
| |
| /** Start Netty client actors to connect to remote machines */ |
| def startClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = { |
| val clientsToClose = remoteClientMap.get().filterKeys(!hostPorts.contains(_)).keySet |
| closeClients(clientsToClose) |
| hostPorts.toList.foldLeft(Future(Map.empty[HostPort, ActorRef])) { (future, hostPort) => |
| remoteClientMap.alter { map => |
| if (!map.contains(hostPort)) { |
| val actor = context.connect(hostPort) |
| map + (hostPort -> actor) |
| } else { |
| map |
| } |
| } |
| } |
| } |
| |
| def closeClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = { |
| remoteClientMap.alter { map => |
| map.filterKeys(hostPorts.contains).foreach { hostAndClient => |
| val (_, client) = hostAndClient |
| client ! Close |
| } |
| map -- hostPorts |
| } |
| } |
| |
| def registerLocalActor(id: Long, actor: ActorRef): Unit = { |
| LOG.info(s"RegisterLocalActor: $id, actor: ${actor.path.name}") |
| init |
| localActorMap.sendOff(_ + (id -> actor)) |
| } |
| |
| def lookupLocalActor(id: Long): Option[ActorRef] = localActorMap.get().get(id) |
| |
| def lookupRemoteAddress(id: Long): Option[HostPort] = remoteAddressMap.get().get(id) |
| |
| /** Send message to remote task */ |
| def transport(taskMessage: TaskMessage, remote: HostPort): Unit = { |
| |
| val remoteClient = remoteClientMap.get.get(remote) |
| if (remoteClient.isDefined) { |
| remoteClient.get.tell(taskMessage, Actor.noSender) |
| } else { |
| val errorMsg = s"Clients has not been launched properly before transporting messages, " + |
| s"the destination is $remote" |
| LOG.error(errorMsg) |
| throw new Exception(errorMsg) |
| } |
| } |
| } |
| |
| /** A customized transport layer by using Akka extension */ |
| object Express extends ExtensionId[Express] with ExtensionIdProvider { |
| val LOG: Logger = LogUtil.getLogger(getClass) |
| |
| override def get(system: ActorSystem): Express = super.get(system) |
| |
| override def lookup: ExtensionId[Express] = Express |
| |
| override def createExtension(system: ExtendedActorSystem): Express = new Express(system) |
| } |