blob: d186b78ee7bbf64a67b4ccabecf23e1e08410d67 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engineconn.acessible.executor.lock
import java.util.concurrent.{ScheduledFuture, ScheduledThreadPoolExecutor, Semaphore, TimeUnit}
import com.webank.wedatasphere.linkis.common.utils.Logging
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.listener.ExecutorStatusListener
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.listener.event.{ExecutorCompletedEvent, ExecutorCreateEvent, ExecutorStatusChangedEvent, ExecutorUnLockEvent}
import com.webank.wedatasphere.linkis.engineconn.core.executor.ExecutorManager
import com.webank.wedatasphere.linkis.engineconn.executor.listener.ExecutorListenerBusContext
import com.webank.wedatasphere.linkis.engineconn.executor.service.ManagerService
import com.webank.wedatasphere.linkis.manager.common.entity.enumeration.NodeStatus
class EngineConnTimedLock(private var timeout: Long) extends TimedLock with Logging with ExecutorStatusListener {
var lock = new Semaphore(1)
val releaseScheduler = new ScheduledThreadPoolExecutor(1)
var releaseTask: ScheduledFuture[_] = null
var lastLockTime: Long = 0
var lockedBy: AccessibleExecutor = null
override def acquire(executor: AccessibleExecutor): Unit = {
lock.acquire()
lastLockTime = System.currentTimeMillis()
lockedBy = executor
scheduleTimeout
}
override def tryAcquire(executor: AccessibleExecutor): Boolean = {
if (null == executor || NodeStatus.Unlock != executor.getStatus) return false
val succeed = lock.tryAcquire()
debug("try to lock for succeed is " + succeed.toString)
if (succeed) {
lastLockTime = System.currentTimeMillis()
lockedBy = executor
debug("try to lock for add time out task ! Locked by thread : " + lockedBy.getId)
scheduleTimeout
}
succeed
}
// Unlock callback is not called in release method, because release method is called actively
override def release(): Unit = {
debug("try to release for lock," + lockedBy + ",current thread " + Thread.currentThread().getName)
if (lockedBy != null) {
//&& lockedBy == Thread.currentThread() Inconsistent thread(线程不一致)
debug("try to release for lockedBy and thread ")
if (releaseTask != null) {
releaseTask.cancel(true)
releaseTask = null
}
debug("try to release for lock release success")
lockedBy = null
}
unlockCallback(lock.toString)
resetLock()
}
private def resetLock(): Unit = {
lock.release()
lock = new Semaphore(1)
}
override def forceRelease(): Unit = {
if (isAcquired()) {
if (releaseTask != null) {
releaseTask.cancel(true)
releaseTask = null
releaseScheduler.purge()
}
lock.release()
lockedBy = null
}
resetLock()
}
private def scheduleTimeout: Unit = {
synchronized {
if (null == releaseTask ) {
releaseTask = releaseScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = {
synchronized {
if (isAcquired() && NodeStatus.Idle == lockedBy.getStatus && isExpired()) {
// unlockCallback depends on lockedBy, so lockedBy cannot be set null before unlockCallback
info(s"Lock : [${lock.toString} was released due to timeout." )
release()
} else if (isAcquired() && NodeStatus.Busy == lockedBy.getStatus) {
lastLockTime = System.currentTimeMillis()
info("Update lastLockTime because executor is busy.")
}
}
}
}, 3000, AccessibleExecutorConfiguration.ENGINECONN_LOCK_CHECK_INTERVAL.getValue.toLong, TimeUnit.MILLISECONDS)
info("Add scheduled timeout task.")
}
}
}
override def isAcquired(): Boolean = {
lock.availablePermits() < 1
}
override def isExpired(): Boolean = {
if (lastLockTime == 0) return false
if (timeout <= 0) return false
System.currentTimeMillis() - lastLockTime > timeout
}
override def numOfPending(): Int = {
lock.getQueueLength
}
override def renew(): Boolean = {
if (lockedBy != null && lockedBy == Thread.currentThread()) {
if (isAcquired && releaseTask != null) {
if (releaseTask.cancel(false)) {
releaseScheduler.purge()
scheduleTimeout
lastLockTime = System.currentTimeMillis()
return true
}
}
}
false
}
override def resetTimeout(timeout: Long): Unit = synchronized {
if (isAcquired()) {
if (null != releaseTask && !isExpired()) {
releaseTask.cancel(true)
this.timeout = timeout
}
scheduleTimeout
} else {
error("Lock is not acquired, so cannot be reset-Timeout")
}
}
private def unlockCallback(lockStr: String): Unit = {
/*if (null != lockedBy) {
lockedBy.transition(NodeStatus.Unlock)
}*/
val executors = ExecutorManager.getInstance.getExecutors.filter(executor => null != executor && !executor.isClosed)
if (null != executors && !executors.isEmpty) {
executors.foreach(executor => executor match {
case accessibleExecutor: AccessibleExecutor =>
accessibleExecutor.transition(NodeStatus.Unlock)
case _ =>
})
}
ExecutorListenerBusContext.getExecutorListenerBusContext().getEngineConnAsyncListenerBus.post(ExecutorUnLockEvent(null, lockStr.toString))
}
override def onExecutorCreated(executorCreateEvent: ExecutorCreateEvent): Unit = {}
override def onExecutorCompleted(executorCompletedEvent: ExecutorCompletedEvent): Unit = {}
override def onExecutorStatusChanged(executorStatusChangedEvent: ExecutorStatusChangedEvent): Unit = {
val toStatus = executorStatusChangedEvent.toStatus
if (isAcquired() && NodeStatus.Idle == toStatus) {
info(s"Status changed to ${toStatus.name()}, update lastUpdatedTime for lock.")
lastLockTime = System.currentTimeMillis()
scheduleTimeout
}
}
}