blob: 49893641856944f7d6b1d84cb550569ff3efb163 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.services
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import akka.actor.ActorSystem
import akka.http.scaladsl.model.headers.{HttpChallenge, HttpCookie, HttpCookiePair}
import akka.http.scaladsl.model.{RemoteAddress, StatusCodes, Uri}
import akka.http.scaladsl.server.AuthenticationFailedRejection.{CredentialsMissing, CredentialsRejected}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.directives.FormFieldDirectives.FieldMagnet
import akka.stream.Materializer
import com.typesafe.config.Config
import com.softwaremill.session.SessionDirectives._
import com.softwaremill.session.SessionOptions._
import com.softwaremill.session.{MultiValueSessionSerializer, SessionConfig, SessionManager}
import upickle.default.write
import org.apache.gearpump.security.{Authenticator => BaseAuthenticator}
import org.apache.gearpump.services.SecurityService.{User, UserSession}
import org.apache.gearpump.services.security.oauth2.OAuth2Authenticator
import org.apache.gearpump.util.{Constants, LogUtil}
// NOTE: This cannot be removed!!!
import org.apache.gearpump.services.util.UpickleUtil._
/**
* Security authentication endpoint.
*
* - When user cannot be authenticated, will reject with 401 AuthenticationFailedRejection
* - When user can be authenticated, but are not authorized to access certail resource, will
* return a 405 AuthorizationFailedRejection.
* - When web UI frontend receive 401, it should redirect the UI to login page.
* - When web UI receive 405,it should display errors like
* "current user is not authorized to access this resource."
*
* The Authenticator used is pluggable, the current Authenticator is resolved by looking up
* config path [[org.apache.gearpump.util.Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS]].
*
* See [[org.apache.gearpump.security.Authenticator]] to find more info on custom Authenticator.
*/
class SecurityService(inner: RouteService, implicit val system: ActorSystem) extends RouteService {
// Use scheme "GearpumpBasic" to avoid popping up web browser native authentication box.
private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = Some("gearpump"),
params = Map.empty[String, String])
val LOG = LogUtil.getLogger(getClass, "AUDIT")
private val config = system.settings.config
private val sessionConfig = SessionConfig.fromConfig(config)
private implicit val sessionManager: SessionManager[UserSession] =
new SessionManager[UserSession](sessionConfig)
private val authenticator = {
val clazz = Class.forName(config.getString(Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS))
val constructor = clazz.getConstructor(classOf[Config])
val authenticator = constructor.newInstance(config).asInstanceOf[BaseAuthenticator]
authenticator
}
private def configToMap(config: Config, path: String) = {
import scala.collection.JavaConverters._
config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
}
private val oauth2Providers: Map[String, String] = {
if (config.getBoolean(Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED)) {
val map = configToMap(config, Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS)
map.keys.toList.map { key =>
val iconPath = config.getString(s"${Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS}.$key.icon")
(key, iconPath)
}.toMap
} else {
Map.empty[String, String]
}
}
private def authenticate(user: String, pass: String)(implicit ec: ExecutionContext)
: Future[Option[UserSession]] = {
authenticator.authenticate(user, pass, ec).map { result =>
if (result.authenticated) {
Some(UserSession(user, result.permissionLevel))
} else {
None
}
}
}
private def rejectMissingCredentials: Route = {
reject(AuthenticationFailedRejection(CredentialsMissing, challenge))
}
private def rejectWrongCredentials: Route = {
reject(AuthenticationFailedRejection(CredentialsRejected, challenge))
}
private def requireAuthentication(inner: UserSession => Route): Route = {
optionalSession(oneOff, usingCookiesOrHeaders) { sessionOption =>
sessionOption match {
case Some(session) => {
inner(session)
}
case None =>
rejectMissingCredentials
}
}
}
private def login(session: UserSession, ip: String, redirectToRoot: Boolean = false): Route = {
setSession(oneOff, usingCookies, session) {
val user = session.user
// Default: 1 day
val maxAgeMs = 1000 * sessionConfig.sessionMaxAgeSeconds.getOrElse(24 * 3600L)
setCookie(HttpCookie.fromPair(HttpCookiePair("username", user), path = Some("/"),
maxAge = Some(maxAgeMs))) {
LOG.info(s"user $user login from $ip")
if (redirectToRoot) {
redirect(Uri("/"), StatusCodes.TemporaryRedirect)
} else {
complete(write(new User(user)))
}
}
}
}
private def logout(user: UserSession, ip: String): Route = {
invalidateSession(oneOff, usingCookies) { ctx =>
LOG.info(s"user ${user.user} logout from $ip")
ctx.complete(write(new User(user.user)))
}
}
// Only admin are able to access operation like post/delete/put
private def requireAuthorization(user: UserSession, route: => Route): Route = {
// Valid user
if (user.permissionLevel >= BaseAuthenticator.User.permissionLevel) {
route
} else {
// Possibly a guest or not authenticated.
(put | delete | post) {
// Reject with 405 authorization error
reject(AuthorizationFailedRejection)
} ~
get {
route
}
}
}
private val unknownIp: Directive1[RemoteAddress] = {
Directive[Tuple1[RemoteAddress]]{ inner =>
inner(new Tuple1(RemoteAddress.Unknown))
}
}
override val route: Route = {
extractExecutionContext{implicit ec: ExecutionContext =>
extractMaterializer{implicit mat: Materializer =>
(extractClientIP | unknownIp) { ip =>
pathPrefix("login") {
pathEndOrSingleSlash {
get {
getFromResource("login/login.html")
} ~
post {
// Guest account don't have permission to submit new application in UI
formField(FieldMagnet('username.as[String])) {user: String =>
formFields(FieldMagnet('password.as[String])) {pass: String =>
val result = authenticate(user, pass)
onSuccess(result) {
case Some(session) =>
login(session, ip.toString)
case None =>
rejectWrongCredentials
}
}
}
}
} ~
path ("oauth2" / "providers") {
// Responds with a list of OAuth2 providers.
complete(write(oauth2Providers))
} ~
// Support OAUTH Authentication
pathPrefix ("oauth2"/ Segment) {providerName =>
// Resolve OAUTH Authentication Provider
val oauthService = OAuth2Authenticator.get(config, providerName, ec)
if (oauthService == null) {
// OAuth2 is disabled.
complete(StatusCodes.NotFound)
} else {
def loginWithOAuth2Parameters(parameters: Map[String, String]): Route = {
val result = oauthService.authenticate(parameters)
onComplete(result) {
case Success(session) =>
login(session, ip.toString, redirectToRoot = true)
case Failure(ex) => {
LOG.info(s"Failed to login user from ${ip.toString}", ex)
rejectWrongCredentials
}
}
}
path ("authorize") {
// Redirects to OAuth2 service provider for authorization.
redirect(Uri(oauthService.getAuthorizationUrl), StatusCodes.TemporaryRedirect)
} ~
path ("accesstoken") {
post {
// Guest account don't have permission to submit new application in UI
formField(FieldMagnet('accesstoken.as[String])) {accesstoken: String =>
loginWithOAuth2Parameters(Map("accesstoken" -> accesstoken))
}
}
} ~
path("callback") {
// Login with authorization code or access token.
parameterMap {parameters =>
loginWithOAuth2Parameters(parameters)
}
}
}
}
} ~
path("logout") {
post {
requireAuthentication {session =>
logout(session, ip.toString())
}
}
} ~
requireAuthentication {user =>
requireAuthorization(user, inner.route)
}
}}}
}
}
object SecurityService {
val SESSION_MANAGER_KEY = "akka.http.session.server-secret"
case class UserSession(user: String, permissionLevel: Int)
object UserSession {
private val User = "user"
private val PermissionLevel = "permissionLevel"
implicit def serializer: MultiValueSessionSerializer[UserSession] = {
new MultiValueSessionSerializer[UserSession](
toMap = {t: UserSession =>
Map(User -> t.user, PermissionLevel -> t.permissionLevel.toString)
},
fromMap = {m: Map[String, String] =>
if (m.contains(User)) {
Try(UserSession(m(User), m(PermissionLevel).toInt))
} else {
Failure[UserSession](new Exception("Fail to parse session "))
}
}
)
}
}
case class User(user: String)
}