| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nlpcraft.server.user |
| |
| import java.util.{Timer, TimerTask} |
| import io.opencensus.trace.Span |
| import org.apache.commons.validator.routines.EmailValidator |
| import org.apache.ignite.{IgniteAtomicSequence, IgniteCache, IgniteSemaphore, IgniteState, Ignition} |
| import org.apache.nlpcraft.common.blowfish.NCBlowfishHasher |
| import org.apache.nlpcraft.common.config.NCConfigurable |
| import org.apache.nlpcraft.common.{NCService, _} |
| import org.apache.nlpcraft.server.ignite.NCIgniteHelpers._ |
| import org.apache.nlpcraft.server.ignite.NCIgniteInstance |
| import org.apache.nlpcraft.server.mdo.NCUserMdo |
| import org.apache.nlpcraft.server.sql.{NCSql, NCSqlManager} |
| import org.apache.nlpcraft.server.tx.NCTxManager |
| |
| import scala.jdk.CollectionConverters.IterableHasAsScala |
| import scala.util.control.Exception._ |
| |
| /** |
| * User CRUD manager. |
| */ |
| object NCUserManager extends NCService with NCIgniteInstance { |
| // Static email validator. |
| private final val EMAIL_VALIDATOR = EmailValidator.getInstance() |
| |
| // Caches. |
| @volatile private var tokenSigninCache: IgniteCache[String, SigninSession] = _ |
| @volatile private var idSigninCache: IgniteCache[Long, Set[String]] = _ |
| |
| @volatile private var usersSeq: IgniteAtomicSequence = _ |
| @volatile private var pwdSeq: IgniteAtomicSequence = _ |
| @volatile private var userLock: IgniteSemaphore = _ |
| |
| // Access token timeout scanner. |
| @volatile private var scanner: Timer = _ |
| |
| // Session holder. |
| private case class SigninSession( |
| acsToken: String, |
| userId: Long, |
| signinMs: Long, |
| lastAccessMs: Long |
| ) |
| |
| private object Config extends NCConfigurable { |
| final private val pre = "nlpcraft.server.user" |
| |
| def pwdPoolBlowup: Int = getInt(s"$pre.pwdPoolBlowup") |
| def timeoutScannerFreqMins: Int = getInt(s"$pre.timeoutScannerFreqMins") |
| def accessTokenExpireTimeoutMins: Int = getInt(s"$pre.accessTokenExpireTimeoutMins") |
| |
| def scannerMs: Int = timeoutScannerFreqMins * 60 * 1000 |
| def expireMs: Int = accessTokenExpireTimeoutMins * 60 * 1000 |
| |
| /** |
| * |
| */ |
| def check(): Unit = { |
| if (pwdPoolBlowup <= 1) |
| throw new NCE(s"Configuration parameter must be > 1 [" + |
| s"name=$pre.pwdPoolBlowup, " + |
| s"value=$pwdPoolBlowup" + |
| s"]") |
| if (timeoutScannerFreqMins <= 0) |
| throw new NCE(s"Configuration parameter must be > 0 [" + |
| s"name=$pre.timeoutScannerFreqMins, " + |
| s"value=$timeoutScannerFreqMins" + |
| s"]") |
| if (accessTokenExpireTimeoutMins <= 0) |
| throw new NCE(s"Configuration parameter must be > 0 [" + |
| s"name=$pre.accessTokenExpireTimeoutMins, " + |
| s"value=$accessTokenExpireTimeoutMins" + |
| s"]") |
| } |
| } |
| |
| Config.check() |
| |
| /** |
| * |
| * @param parent Optional parent span. |
| */ |
| override def stop(parent: Span): Unit = startScopedSpan("stop", parent) { _ => |
| ackStopping() |
| |
| if (scanner != null) |
| scanner.cancel() |
| |
| scanner = null |
| |
| tokenSigninCache = null |
| idSigninCache = null |
| |
| ackStopped() |
| } |
| |
| /** |
| * |
| * @param parent Optional parent span. |
| * @return |
| */ |
| override def start(parent: Span = null): NCService = startScopedSpan("start", parent) { span => |
| ackStarting() |
| |
| addTags( |
| span, |
| "pwdPoolBlowup" -> Config.pwdPoolBlowup, |
| "timeoutScannerFreqMins" -> Config.timeoutScannerFreqMins, |
| "accessTokenExpireTimeoutMins" -> Config.accessTokenExpireTimeoutMins |
| ) |
| |
| catching(wrapIE) { |
| usersSeq = NCSql.mkSeq(ignite, "usersSeq", "nc_user", "id") |
| pwdSeq = NCSql.mkSeq(ignite, "pwdSeq", "passwd_pool", "id") |
| |
| tokenSigninCache = ignite.cache[String, SigninSession]("user-token-signin-cache") |
| idSigninCache = ignite.cache[Long, Set[String]]("user-id-signin-cache") |
| |
| require(tokenSigninCache != null) |
| require(idSigninCache != null) |
| } |
| |
| scanner = new Timer("timeout-scanner") |
| |
| scanner.scheduleAtFixedRate( |
| new TimerTask() { |
| def run(): Unit = { |
| // This doesn't 100% guarantee that we won't run into a race condition |
| // with the shutdown hook on Ignite. |
| if (Ignition.state() == IgniteState.STARTED) |
| try { |
| val now = U.nowUtcMs() |
| |
| // Check access tokens for expiration. |
| catching(wrapIE) { |
| NCTxManager.startTx { |
| for (ses <- tokenSigninCache.asScala.map(_.getValue) |
| if now - ses.lastAccessMs >= Config.expireMs |
| ) { |
| tokenSigninCache -= ses.acsToken |
| clearSigninCache(ses) |
| |
| logger.trace(s"Access token timed out: ${ses.acsToken}") |
| } |
| } |
| } |
| } |
| catch { |
| case e: IllegalStateException => |
| // Attempt to hide possible race condition with Ignite on a shutdown. |
| if (!e.getMessage.startsWith("Grid is in invalid state to perform this operation")) |
| U.prettyError(logger,"Error during timeout scanner process:", e) |
| |
| case e: Throwable => |
| U.prettyError(logger,"Error during timeout scanner process:", e) |
| } |
| } |
| }, |
| Config.scannerMs, |
| Config.scannerMs |
| ) |
| |
| userLock = ignite.semaphore("userSemaphore", 1, true, true) |
| |
| ackStarted() |
| } |
| |
| /** |
| * Gets the list of all current users for given company ID. |
| * |
| * @param compId Company ID. |
| * @param parent Optional parent span. |
| */ |
| @throws[NCE] |
| def getAllUsers(compId: Long, parent: Span = null): Seq[NCUserMdo] = |
| NCSql.sql { |
| NCSqlManager.getAllUsers(compId, parent) |
| } |
| |
| /** |
| * Gets flag which indicates there are another admin users in the system or not. |
| * |
| * @param id User ID. |
| * @param parent Optional parent span. |
| */ |
| @throws[NCE] |
| def isOtherAdminsExist(id: Long, parent: Span = null): Boolean = |
| NCSql.sql { |
| NCSqlManager.isOtherAdminsExist(id, parent) |
| } |
| |
| /** |
| * |
| * @param ses |
| */ |
| private def clearSession(ses: SigninSession): Unit = { |
| clearSigninCache(ses) |
| |
| logger.info(s"User ID signed out: ${ses.userId}") |
| } |
| |
| /** |
| * |
| * @param acsTok Access token to sign out. |
| * @param parent Optional parent span. |
| */ |
| @throws[NCE] |
| def signout(acsTok: String, parent: Span = null): Unit = |
| startScopedSpan("signout", parent, "acsTok" -> acsTok) { _ => |
| catching(wrapIE) { |
| NCTxManager.startTx { |
| tokenSigninCache -== acsTok match { |
| case Some(ses) => clearSession(ses) |
| case None => // No-op. |
| } |
| } |
| } |
| } |
| |
| /** |
| * |
| * @param userId User ID. |
| * @param parent Optional parent span. |
| */ |
| @throws[NCE] |
| def signoutAllSessions(userId: Long, parent: Span = null): Unit = |
| startScopedSpan("signout", parent, "userId" -> userId) { _ => |
| catching(wrapIE) { |
| NCTxManager.startTx { |
| idSigninCache -== userId match { |
| case Some(acsToks) => |
| acsToks.foreach(acsTok => |
| tokenSigninCache -== acsTok match { |
| case Some(ses) => clearSession(ses) |
| case None => // No-op. |
| } |
| ) |
| |
| case None => // No-op. |
| } |
| } |
| } |
| } |
| |
| /** |
| * Gets user ID associated with active access token, if any. |
| * |
| * @param acsTok Access token. |
| * @param parent Optional parent span. |
| * @return |
| */ |
| @throws[NCE] |
| def getUserForAccessToken(acsTok: String, parent: Span = null): Option[NCUserMdo] = |
| startScopedSpan("getUserForAccessToken", parent, "acsTok" -> acsTok) { span => |
| getUserIdForAccessToken(acsTok, span).flatMap(getUserById(_, span)) |
| } |
| |
| /** |
| * Gets user ID associated with active access token, if any. |
| * |
| * @param acsTok Access token. |
| * @param parent Optional parent span. |
| * @return |
| */ |
| @throws[NCE] |
| def getUserIdForAccessToken(acsTok: String, parent: Span = null): Option[Long] = |
| startScopedSpan("getUserIdForAccessToken", parent, "acsTok" -> acsTok) { _ => |
| catching(wrapIE) { |
| tokenSigninCache(acsTok) match { |
| case Some(ses) => |
| val now = U.nowUtcMs() |
| |
| // Update login session. |
| tokenSigninCache += acsTok -> SigninSession(acsTok, ses.userId, ses.signinMs, now) |
| |
| Some(ses.userId) // Bingo! |
| |
| case None => None |
| } |
| } |
| } |
| |
| /** |
| * Gets user for given user ID. |
| * |
| * @param id User ID. |
| * @param parent Optional parent span. |
| */ |
| @throws[NCE] |
| def getUserById(id: Long, parent: Span = null): Option[NCUserMdo] = |
| startScopedSpan("getUser", parent, "usrId" -> id) { span => |
| NCSql.sql { |
| NCSqlManager.getUserById(id, span) |
| } |
| } |
| |
| /** |
| * |
| * @param email User email (as username). |
| * @param passwd User password. |
| * @param parent Optional parent span. |
| * @return |
| */ |
| @throws[NCE] |
| def signin(email: String, passwd: String, parent: Span = null): Option[String] = |
| startScopedSpan("signin", parent, "email" -> email) { span => |
| catching(wrapIE) { |
| NCTxManager.startTx { |
| NCSql.sql { |
| NCSqlManager.getUserByEmail(email, span) |
| } match { |
| case Some(usr) => |
| require(usr.passwordSalt.isDefined) |
| |
| NCSql.sql { |
| if (!NCSqlManager.isKnownPasswordHash(NCBlowfishHasher.hash(passwd, usr.passwordSalt.get), span)) |
| None |
| else { |
| val acsTkn = U.genGuid() |
| val now = U.nowUtcMs() |
| |
| tokenSigninCache += acsTkn -> SigninSession(acsTkn, usr.id, now, now) |
| |
| idSigninCache(usr.id) match { |
| case Some(toks) => idSigninCache += usr.id -> (toks ++ Set(acsTkn)) |
| case None => idSigninCache += usr.id -> Set(acsTkn) |
| } |
| |
| require(usr.email.isDefined && usr.firstName.isDefined && usr.lastName.isDefined) |
| |
| logger.info(s"User signed in [" + |
| s"id=${usr.id}, " + |
| s"email=${usr.email.get}, " + |
| s"name=${usr.firstName.get} ${usr.lastName.get}" + |
| s"]") |
| |
| Some(acsTkn) |
| } |
| } |
| case None => None |
| } |
| } |
| } |
| } |
| |
| /** |
| * |
| * @param id |
| * @param firstName |
| * @param lastName |
| * @param avatarUrl |
| * @param props |
| * @param parent Optional parent span. |
| * @return |
| */ |
| @throws[NCE] |
| def updateUser( |
| id: Long, |
| firstName: String, |
| lastName: String, |
| avatarUrl: Option[String], |
| props: Option[String], |
| parent: Span = null |
| ): Unit = |
| startScopedSpan("updateUser", parent, "usrId" -> id) { span => |
| NCSql.sql { |
| if (NCSqlManager.updateUser(id, firstName, lastName, avatarUrl, props, span) != 1) |
| throw new NCE(s"Unknown user ID: $id") |
| } |
| } |
| |
| /** |
| * |
| * @param id |
| * @param isAdmin |
| * @param parent Optional parent span. |
| * @return |
| */ |
| @throws[NCE] |
| def updateUserPermissions(id: Long, isAdmin: Boolean, parent: Span = null): Unit = |
| startScopedSpan("updateUserPermissions", parent, "usrId" -> id, "isAdmin" -> isAdmin) { span => |
| NCSql.sql { |
| if (NCSqlManager.updateUserAdmin(id, isAdmin, span) != 1) |
| throw new NCE(s"Unknown user ID: $id") |
| } |
| } |
| |
| /** |
| * |
| * @param id |
| * @param parent Optional parent span. |
| * @return |
| */ |
| @throws[NCE] |
| def deleteUser(id: Long, parent: Span = null): Unit = |
| startScopedSpan("deleteUser", parent, "usrId" -> id) { span => |
| NCSql.sql { |
| if (NCSqlManager.deleteUser(id, span) != 1) |
| throw new NCE(s"Unknown user ID: $id") |
| } |
| } |
| |
| /** |
| * |
| * @param id ID of the user to reset password for. |
| * @param newPasswd New password to set. |
| * @param parent Optional parent span. |
| */ |
| @throws[NCE] |
| def resetPassword(id: Long, newPasswd: String, parent: Span = null): Unit = |
| startScopedSpan("resetPassword", parent, "usrId" -> id) { span => |
| NCSql.sql { |
| val usr = NCSqlManager.getUserById(id, span).getOrElse(throw new NCE(s"Unknown user ID: $id")) |
| |
| require(usr.email.isDefined) |
| |
| val salt = NCBlowfishHasher.hash(usr.email.get) |
| |
| // Add actual hash for the password. |
| // NOTE: we don't "stir up" password pool for password resets. |
| NCSqlManager.addPasswordHash(pwdSeq.incrementAndGet(), NCBlowfishHasher.hash(newPasswd, salt), span) |
| } |
| |
| catching(wrapIE) { |
| NCTxManager.startTx { |
| idSigninCache(id) match { |
| case Some(toks) => |
| tokenSigninCache --= toks.toSeq |
| idSigninCache -= id |
| |
| case None => // No-op. |
| } |
| } |
| } |
| } |
| |
| /** |
| * |
| * @param compId |
| * @param email |
| * @param pwd |
| * @param firstName |
| * @param lastName |
| * @param avatarUrl |
| * @param isAdmin |
| * @param props |
| * @param usrExtIdOpt |
| * @param parent Optional parent span. |
| */ |
| @throws[NCE] |
| def addUser( |
| compId: Long, |
| email: String, |
| pwd: String, |
| firstName: String, |
| lastName: String, |
| avatarUrl: Option[String], |
| isAdmin: Boolean, |
| props: Option[String], |
| usrExtIdOpt: Option[String], |
| parent: Span = null |
| ): Long = |
| startScopedSpan( |
| "addUser", |
| parent, |
| "compId" -> compId, |
| "email" -> email, |
| "firstName" -> firstName, |
| "lastName" -> lastName, |
| "usrExtId" -> usrExtIdOpt.orNull, |
| "isAdmin" -> isAdmin) { span => |
| val normEmail = U.normalizeEmail(email) |
| |
| if (!EMAIL_VALIDATOR.isValid(normEmail)) |
| throw new NCE(s"New user email is invalid: $normEmail") |
| |
| val salt = NCBlowfishHasher.hash(normEmail) |
| |
| NCSql.sql { |
| // Some database implementations (including Ignite database) may not support unique constraints. |
| // Because we have to support user email unique values, adding user operation is synchronized. |
| val id = |
| try { |
| userLock.acquire() |
| |
| if (NCSqlManager.getUserByEmail(normEmail, span).isDefined) |
| throw new NCE(s"User with this email already exists: $normEmail") |
| |
| usrExtIdOpt match { |
| case Some(usrExtId) => |
| val id = NCSqlManager. |
| getUserId(compId, usrExtId, span). |
| getOrElse(throw new NCE(s"User not found [companyId=$compId, extId=$usrExtId]")) |
| |
| NCSqlManager.updateUser( |
| id = id, |
| email = normEmail, |
| passwdSalt = salt, |
| firstName = firstName, |
| lastName = lastName, |
| avatarUrl = avatarUrl, |
| propsOpt = props, |
| parent = span |
| ) |
| |
| logger.info(s"User converted [usrExtId=$usrExtId, email=$email]") |
| |
| id |
| case None => |
| val newUsrId = usersSeq.incrementAndGet() |
| |
| NCSqlManager.addUser( |
| id = newUsrId, |
| compId = compId, |
| usrExtId = None, |
| email = Some(normEmail), |
| firstName = Some(firstName), |
| lastName = Some(lastName), |
| avatarUrl = avatarUrl, |
| passwdSalt = Some(salt), |
| isAdmin = isAdmin, |
| propsOpt = props, |
| parent = span |
| ) |
| |
| logger.info(s"User $email created.") |
| |
| newUsrId |
| } |
| } |
| finally |
| userLock.release() |
| |
| // Add actual hash for the password. |
| NCSqlManager.addPasswordHash(pwdSeq.incrementAndGet(), NCBlowfishHasher.hash(pwd, salt), span) |
| |
| // "Stir up" password pool with each user. |
| (0 to Math.round((Math.random() * Config.pwdPoolBlowup) + Config.pwdPoolBlowup).toInt).foreach(_ => |
| NCSqlManager.addPasswordHash(pwdSeq.incrementAndGet(), NCBlowfishHasher.hash(U.genGuid()), span) |
| ) |
| |
| id |
| } |
| } |
| |
| /** |
| * |
| * @param ses |
| */ |
| @throws[NCE] |
| private def clearSigninCache(ses: SigninSession): Unit = |
| startScopedSpan("clearSigninCache", "usrId" -> ses.userId) { _ => |
| catching(wrapIE) { |
| idSigninCache(ses.userId) match { |
| case Some(toks) => |
| val fixedToks = toks -- Seq(ses.acsToken) |
| |
| if (fixedToks.isEmpty) |
| idSigninCache -= ses.userId |
| else |
| idSigninCache += ses.userId -> fixedToks |
| |
| case None => // No-op. |
| } |
| } |
| } |
| |
| /** |
| * Gets technical user ID for given external ID. |
| * If user with given external ID is missing, it creates a new one and returns its user ID. |
| * |
| * @param companyId Company ID. |
| * @param usrExtId External user ID. |
| * @param parent Parent. |
| */ |
| @throws[NCE] |
| def getOrInsertExternalUserId(companyId: Long, usrExtId: String, parent: Span = null): Long = |
| startScopedSpan( |
| "getOrInsertExternalUserId", |
| parent, |
| "companyId" -> companyId, |
| "usrExtId" -> usrExtId) { span => |
| NCSql.sql { |
| NCSqlManager.getUserId(companyId, usrExtId, span) match { |
| case Some(id) => id |
| case None => |
| try { |
| userLock.acquire() |
| |
| NCSqlManager.getUserId(companyId, usrExtId, span) match { |
| case Some(id) => id |
| case None => |
| val id = usersSeq.incrementAndGet() |
| |
| NCSqlManager.addUser( |
| id, |
| companyId, |
| usrExtId = Some(usrExtId), |
| email = None, |
| firstName = None, |
| lastName = None, |
| avatarUrl = None, |
| passwdSalt = None, |
| isAdmin = false, |
| propsOpt = None, |
| parent = parent |
| ) |
| |
| id |
| } |
| } |
| finally |
| userLock.release() |
| } |
| } |
| } |
| |
| /** |
| * Gets technical user ID by given external ID. |
| * |
| * @param companyId Company ID. |
| * @param extUserId External user ID. |
| * @param parent Parent. |
| */ |
| @throws[NCE] |
| def getUserId(companyId: Long, extUserId: String, parent: Span = null): Option[Long] = |
| startScopedSpan("getUser", parent, "companyId" -> companyId, "extUserId" -> extUserId) { span => |
| NCSql.sql { |
| NCSqlManager.getUserId(companyId, extUserId, span) |
| } |
| } |
| } |