blob: 6971f73027e183b125334af778186d653e6b6a41 [file] [log] [blame]
/*
* Copyright 2015-2016 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package slack.whisk
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import akka.Done
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import akka.http.scaladsl.unmarshalling._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.stream.ActorMaterializer
import akka.stream.OverflowStrategy
import akka.stream.QueueOfferResult
import akka.stream.scaladsl._
import spray.json._
import spray.json.DefaultJsonProtocol._
import java.util.concurrent.atomic.AtomicInteger
object Main {
def main(args : Array[String]) : Unit = {
implicit val system = ActorSystem()
implicit val executionContext = system.dispatcher
implicit val materializer = ActorMaterializer()
val config = system.settings.config
val token = config.getString("slack.bot.token")
val rtmInfo = Await.result(SlackRTM.start(token), Duration.Inf)
val botInfo = SlackRTMBotInfo(rtmInfo.selfId)
val queuePromise = Promise[SourceQueue[Message]]
val msgSource = Source.queue[Message](10, OverflowStrategy.dropNew).mapMaterializedValue { q =>
queuePromise.trySuccess(q)
}
val futureQueue = queuePromise.future
val outgoingMsgCounter = new AtomicInteger(1)
val send: JsObject=>Unit = { msg =>
futureQueue.onSuccess { case q =>
val msgId = outgoingMsgCounter.getAndIncrement()
val msgExt = JsObject(msg.fields + ("id" -> JsNumber(msgId)))
println("Sending : " + msgExt.toString)
q.offer(TextMessage(msgExt.toString)).map { _ match {
case QueueOfferResult.Enqueued =>
// fire-and-forget
case QueueOfferResult.Dropped =>
println("Message was dropped. Queue is full.")
case QueueOfferResult.QueueClosed =>
println("Queue is closed.")
case QueueOfferResult.Failure(f) =>
println("Queue failed: " + f.getMessage)
}}
}
}
val bots: List[SlackBot] = List(
new ActionRunnerBot(send, botInfo)
)
val msgSink: Sink[Message, Future[Done]] = Sink.foreach {
case message: TextMessage.Strict =>
try {
// FIXME this swallows all errors in all "bots"
val parsed = JsonParser(message.text).asJsObject
bots.foreach(b => b.onMessage(parsed))
} catch {
case t : Throwable =>
println("There was an error receiving messages: "+ t.getMessage())
println("Message was: " + message)
}
case x =>
println("Dropping message: " + x)
// drop it like it's hot
}
val flow: Flow[Message,Message,Future[Done]] =
Flow.fromSinkAndSourceMat(msgSink, msgSource)(Keep.left)
val (upgradeResponse, closed) =
Http().singleWebSocketRequest(WebSocketRequest(rtmInfo.wsUrl), flow)
val connected = upgradeResponse.map { upgrade =>
if(upgrade.response.status == StatusCodes.SwitchingProtocols) {
Done
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
connected.onComplete(println)
closed.foreach { _ =>
println("Closed. Shutting down...")
shutItDown()
}
def shutItDown() = {
materializer.shutdown()
Await.ready(Http().shutdownAllConnectionPools(), Duration.Inf)
system.terminate()
Await.result(system.whenTerminated, Duration.Inf)
System.exit(0)
}
// Say "stop" to stop.
var input = ""
while(input.trim != "stop") {
println("Say 'stop' to stop.")
input = scala.io.StdIn.readLine()
}
println("Shutting down...")
shutItDown()
}
}