blob: e93df5846fa26191c85ddfdf859a92ed0906432f [file] [log] [blame]
package sample.cluster.client.grpc
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, Props, Terminated }
import org.apache.pekko.event.LoggingAdapter
import org.apache.pekko.grpc.GrpcClientSettings
import org.apache.pekko.stream._
import org.apache.pekko.stream.scaladsl.Source
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future, Promise }
object ClusterClient {
/**
* Factory method for `ClusterClient` [[org.apache.pekko.actor.Props]].
*/
def props(
settings: ClusterClientSettings)(implicit materializer: Materializer): Props =
Props(new ClusterClient(settings))
sealed trait Command
final case class Send(path: String, msg: Any, localAffinity: Boolean)
extends Command {
/**
* Convenience constructor with `localAffinity` false
*/
def this(path: String, msg: Any) = this(path, msg, localAffinity = false)
}
/**
* More efficient than `Send` for single request-reply interaction
*/
final case class SendAsk(path: String, msg: Any, localAffinity: Boolean)
extends Command {
/**
* Convenience constructor with `localAffinity` false
*/
def this(path: String, msg: Any) = this(path, msg, localAffinity = false)
}
final case class SendToAll(path: String, msg: Any) extends Command
final case class Publish(topic: String, msg: Any) extends Command
private def createClientStub(
settings: ClusterClientSettings)(implicit sys: ActorSystem): ClusterClientReceptionistServiceClient = {
ClusterClientReceptionistServiceClient(settings.grpcClientSettings)
}
private def newSession(
settings: ClusterClientSettings,
receptionistServiceClient: ClusterClientReceptionistServiceClient,
sender: ActorRef,
killSwitch: SharedKillSwitch,
log: LoggingAdapter,
serialization: ClusterClientSerialization)(implicit mat: Materializer): Future[ActorRef] = {
val sessionReqRefPromise = Promise[ActorRef]()
log.info("New session for {}", sender)
receptionistServiceClient
.newSession(
Source
.actorRef[Any](
bufferSize = settings.bufferSize,
overflowStrategy = OverflowStrategy.dropNew,
// never complete from stream element
completionMatcher = PartialFunction.empty,
// never fail from stream element
failureMatcher = PartialFunction.empty)
// .actorRef[Any](bufferSize = settings.bufferSize, overflowStrategy = OverflowStrategy.dropNew)
.via(killSwitch.flow)
.map {
case send: Send =>
val payload = serialization.serializePayload(send.msg)
Req().withSend(
SendReq(send.path, send.localAffinity, Some(payload)))
case sendToAll: SendToAll =>
val payload = serialization.serializePayload(sendToAll.msg)
Req().withSendToAll(SendToAllReq(sendToAll.path, Some(payload)))
case publish: Publish =>
val payload = serialization.serializePayload(publish.msg)
Req().withPublish(PublishReq(publish.topic, Some(payload)))
}
.mapMaterializedValue(sessionReqRef => {
sessionReqRefPromise.success(sessionReqRef)
NotUsed
}))
.watch(sender) // end session when original sender terminates
.recoverWithRetries(-1,
{
case _: WatchedActorTerminatedException => Source.empty
})
.map { rsp =>
serialization.deserializePayload(rsp.payload.get)
}
.runForeach(sender ! _)
.onComplete { result =>
log.info("Session completed for {} with {}", sender, result)
}(mat.executionContext)
sessionReqRefPromise.future
}
private def askSend(
receptionistServiceClient: ClusterClientReceptionistServiceClient,
send: SendAsk,
serialization: ClusterClientSerialization)(implicit ec: ExecutionContext): Future[Any] = {
val payload = serialization.serializePayload(send.msg)
val sendReq = SendReq(send.path, send.localAffinity, Some(payload))
receptionistServiceClient.askSend(sendReq).map { rsp =>
serialization.deserializePayload(rsp.payload.get)
}
}
}
/**
* This actor is intended to be used on an external node that is not member
* of the cluster. It acts like a gateway for sending messages to actors
* somewhere in the cluster. With service discovery and Apache Pekko gRPC it will establish
* a connection to a [[ClusterClientReceptionist]] somewhere in the cluster.
*
* You can send messages via the `ClusterClient` to any actor in the cluster
* that is registered in the [[ClusterClientReceptionist]].
* Messages are wrapped in [[ClusterClient#Send]], [[ClusterClient#SendToAll]]
* or [[ClusterClient#Publish]].
*
* 1. [[ClusterClient#Send]] -
* The message will be delivered to one recipient with a matching path, if any such
* exists. If several entries match the path the message will be delivered
* to one random destination. The sender of the message can specify that local
* affinity is preferred, i.e. the message is sent to an actor in the same local actor
* system as the used receptionist actor, if any such exists, otherwise random to any other
* matching entry.
*
* 2. [[ClusterClient#SendToAll]] -
* The message will be delivered to all recipients with a matching path.
*
* 3. [[ClusterClient#Publish]] -
* The message will be delivered to all recipients Actors that have been registered as subscribers to
* to the named topic.
*
* Use the factory method [[ClusterClient#props]]) to create the
* [[org.apache.pekko.actor.Props]] for the actor.
*
* If the receptionist is not currently available, the client will buffer the messages
* and then deliver them when the connection to the receptionist has been established.
* The size of the buffer is configurable and it can be disabled by using a buffer size
* of 0. When the buffer is full old messages will be dropped when new messages are sent
* via the client.
*
* Note that this is a best effort implementation: messages can always be lost due to the distributed
* nature of the actors involved.
*/
final class ClusterClient(settings: ClusterClientSettings)(
implicit materializer: Materializer) extends Actor
with ActorLogging {
import ClusterClient._
val serialization = new ClusterClientSerialization(context.system)
private val receptionistServiceClient: ClusterClientReceptionistServiceClient =
createClientStub(settings)(context.system)
// Original sender -> stream Source.actorRef of the session
private var sessionRef: Map[ActorRef, Future[ActorRef]] = Map.empty
private val killSwitch = KillSwitches.shared(self.path.name)
override def postStop(): Unit = {
killSwitch.shutdown()
super.postStop()
}
def receive: Receive = {
case send: SendAsk =>
import org.apache.pekko.pattern.pipe
import context.dispatcher
askSend(receptionistServiceClient, send, serialization).pipeTo(sender())
case cmd: Command =>
// Send or Publish
val originalSender = sender()
val session = sessionRef.get(originalSender) match {
case Some(ses) => ses
case None =>
val ses = newSession(
settings,
receptionistServiceClient,
originalSender,
killSwitch,
log,
serialization)
sessionRef = sessionRef.updated(originalSender, ses)
ses
}
context.watch(originalSender)
import context.dispatcher
session.foreach(_ ! cmd)
case Terminated(ref) =>
sessionRef -= ref
}
}
object ClusterClientSettings {
/**
* Create settings from the default configuration
* `sample.cluster.client.grpc`.
*/
def apply(system: ActorSystem): ClusterClientSettings = {
val config = system.settings.config.getConfig("sample.cluster.client.grpc")
val grpcClientSettings = GrpcClientSettings
// FIXME service discovery
.connectToServiceAt("127.0.0.1", 50051)(system)
.withDeadline(3.second) // FIXME config
.withTls(false)
new ClusterClientSettings(
bufferSize = config.getInt("buffer-size"),
grpcClientSettings)
}
}
final case class ClusterClientSettings(bufferSize: Int,
grpcClientSettings: GrpcClientSettings)