blob: 9960e4b72d8c5e0bdfe649e6fe7749f626bf1ebb [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engineconn.acessible.executor.service
import com.webank.wedatasphere.linkis.common.utils.Logging
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.listener.event.{ExecutorLockEvent, ExecutorUnLockEvent}
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.lock.EngineConnTimedLock
import com.webank.wedatasphere.linkis.engineconn.core.executor.ExecutorManager
import com.webank.wedatasphere.linkis.engineconn.executor.listener.ExecutorListenerBusContext
import com.webank.wedatasphere.linkis.governance.common.exception.engineconn.{EngineConnExecutorErrorCode, EngineConnExecutorErrorException}
import com.webank.wedatasphere.linkis.manager.common.entity.enumeration.NodeStatus
import com.webank.wedatasphere.linkis.manager.common.protocol.engine.EngineLockType
import com.webank.wedatasphere.linkis.manager.common.protocol.{RequestEngineLock, RequestEngineUnlock, ResponseEngineLock, ResponseEngineUnlock}
import com.webank.wedatasphere.linkis.message.annotation.Receiver
import com.webank.wedatasphere.linkis.server.BDPJettyServerHelper
import org.apache.commons.lang.StringUtils
class EngineConnTimedLockService extends LockService with Logging {
private var engineConnLock: EngineConnTimedLock = _
private var lockString: String = _
private var lockType: EngineLockType = EngineLockType.Timed
private def isSupportParallelism: Boolean = AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getValue
/**
*
*
* @param lock
* @return
*/
override def isLockExist(lock: String = null): Boolean = synchronized {
if (isSupportParallelism) true
else if (null != engineConnLock && engineConnLock.isAcquired()) {
if (StringUtils.isNotBlank(lock)) {
if (lock.equals(lockString)) true
else false
} else {
false
}
} else {
false
}
}
/**
* Try to lock an Executor in the ExecutorManager. If the lock is successful, it will return the Executor ID as the ID.
* 尝试去锁住ExecutorManager里的一个Executor,如果锁成功的话,将返回Executor ID作为标识
*
* @return
*/
@throws[EngineConnExecutorErrorException]
override def tryLock(requestEngineLock: RequestEngineLock): Option[String] = synchronized {
if (null != engineConnLock && engineConnLock.isAcquired()) return None
this.lockType = requestEngineLock.lockType
lockType match {
case EngineLockType.Always =>
timedLock(-1)
case EngineLockType.Timed =>
timedLock(requestEngineLock.timeout)
case o: Any =>
error("Invalid lockType : " + BDPJettyServerHelper.gson.toJson(o))
return Some(null)
}
}
private def timedLock(timeout: Long): Option[String] = {
// Lock is binded to engineconn, so choose default executor
ExecutorManager.getInstance.getReportExecutor match {
case accessibleExecutor: AccessibleExecutor =>
debug("try to lock for executor state is " + accessibleExecutor.getStatus)
debug("try to lock for executor id is " + accessibleExecutor.getId)
if (null == engineConnLock) {
engineConnLock = new EngineConnTimedLock(timeout)
ExecutorListenerBusContext.getExecutorListenerBusContext().getEngineConnAsyncListenerBus.addListener(engineConnLock)
debug("try to lock for executor get new lock " + engineConnLock)
}
if (engineConnLock.tryAcquire(accessibleExecutor)) {
debug("try to lock for tryAcquire is true ")
this.lockString = engineConnLock.lock.toString
ExecutorListenerBusContext.getExecutorListenerBusContext().getEngineConnAsyncListenerBus.post(ExecutorLockEvent(accessibleExecutor, lockString))
accessibleExecutor.transition(NodeStatus.Idle)
Some(lockString)
} else None
case _ =>
val msg = s"Invalid executor or not instance of SensibleEngine."
error(msg)
throw new EngineConnExecutorErrorException(EngineConnExecutorErrorCode.INVALID_ENGINE_TYPE, msg)
}
}
/**
* Unlock(解锁)
*
* @param lock
*/
override def unlock(lock: String): Boolean = synchronized {
info("try to unlock for lockEntity is " + engineConnLock.toString + ",and lock is " + lock + ",acquired is " + engineConnLock.isAcquired().toString)
if (isLockExist(lock)) {
info(s"try to unlock lockEntity : lockString=$lockString,lockedBy=${engineConnLock.lockedBy.getId}")
engineConnLock.release()
this.lockString = null
ExecutorListenerBusContext.getExecutorListenerBusContext().getEngineConnAsyncListenerBus.post(ExecutorUnLockEvent(null, lock))
ExecutorManager.getInstance.getReportExecutor match {
case accessibleExecutor: AccessibleExecutor =>
accessibleExecutor.transition(NodeStatus.Unlock)
case _ =>
val msg = s"Invalid executor or not instance of SensibleEngine."
error(msg)
}
true
} else {
false
}
}
@Receiver
override def requestUnLock(requestEngineUnlock: RequestEngineUnlock): ResponseEngineUnlock = {
if (StringUtils.isBlank(requestEngineUnlock.lock)) {
error("Invalid requestEngineUnlock: ")
ResponseEngineUnlock(false)
} else {
ResponseEngineUnlock(unlock(requestEngineUnlock.lock))
}
}
override def onAddLock(addLockEvent: ExecutorLockEvent): Unit = {
}
override def onReleaseLock(releaseLockEvent: ExecutorUnLockEvent): Unit = {
}
@Receiver
override def requestLock(requestEngineLock: RequestEngineLock): ResponseEngineLock = {
super.requestLock(requestEngineLock)
}
}
class EngineConnConcurrentLockService extends LockService {
override def isLockExist(lock: String): Boolean = true
override def tryLock(requestEngineLock: RequestEngineLock): Option[String] = {
/*ExecutorManager.getInstance().getDefaultExecutor match {
case accessibleExecutor: AccessibleExecutor =>
// Concurrent Engine don't change status when get lock, status will change in other rules
// accessibleExecutor.transition(NodeStatus.Idle)
case _ =>
}*/
Some("lock")
}
/**
* Unlock(解锁)
*
* @param lock
*/
override def unlock(lock: String): Boolean = {
/*ExecutorManager.getInstance().getDefaultExecutor match {
case accessibleExecutor: AccessibleExecutor =>
accessibleExecutor.transition(NodeStatus.Unlock)
case _ =>
}*/
true
}
@Receiver
override def requestUnLock(requestEngineUnlock: RequestEngineUnlock): ResponseEngineUnlock = ResponseEngineUnlock(true)
override def onAddLock(addLockEvent: ExecutorLockEvent): Unit = {}
override def onReleaseLock(releaseLockEvent: ExecutorUnLockEvent): Unit = {}
@Receiver
override def requestLock(requestEngineLock: RequestEngineLock): ResponseEngineLock = {
super.requestLock(requestEngineLock)
}
}