Gearpump integration for RabbitMQ
The message type that RMQSink is able to handle including:
Suppose there is a DataSource Task will output above-mentioned messages, you can write a simple application then:
val sink = new RMQSink(UserConfig.empty) val sinkProcessor = DataSinkProcessor(sink, "$sinkNum") val split = Processor[DataSource]("$splitNum") val computation = split ~> sinkProcessor val application = StreamApplication("RabbitMQ", Graph(computation), UserConfig.empty)
to initialize the RMQSink's instance, we need a UserConfig object and should provide some config item list below :
rabbitmq.queue.name
: the RabbitMQ queue name we want to sink the message to;rabbitmq.connection.host
: the RabbitMQ server host;rabbitmq.connection.port
: the RabbitMQ server port, default port is 5672;rabbitmq.connection.uri
: the connection uri, pattern is amqp://userName:password@hostName:portNumber/virtualHost
rabbitmq.virtualhost
: the virtual-host which is a logic domain in RabbitMQ Serverrabbitmq.auth.username
: the user name for authorizationrabbitmq.auth.password
: the password for authorizationrabbitmq.automatic.recovery
: if need automatic recovery set true
otherwise set false
rabbitmq.connection.timeout
: the connection's timeoutrabbitmq.network.recovery.internal
: recovery internalrabbitmq.requested.heartbeat
: if need heartbeat set true
otherwise set false
rabbitmq.topology.recoveryenabled
: if need recovery set true
otherwise set false
rabbitmq.channel.max
: the maximum channel numrabbitmq.frame.max
: the maximum frame nummore details : https://www.rabbitmq.com/admin-guide.html