blob: 0055122bf58637ad60c9dc8922a39569adfd75b2 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engineconn.acessible.executor.execution
import java.util.concurrent.TimeUnit
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
import com.webank.wedatasphere.linkis.engineconn.common.creation.EngineCreationContext
import com.webank.wedatasphere.linkis.engineconn.common.engineconn.EngineConn
import com.webank.wedatasphere.linkis.engineconn.common.execution.EngineConnExecution
import com.webank.wedatasphere.linkis.engineconn.core.EngineConnObject
import com.webank.wedatasphere.linkis.engineconn.core.executor.ExecutorManager
import com.webank.wedatasphere.linkis.engineconn.core.hook.ShutdownHook
import com.webank.wedatasphere.linkis.engineconn.executor.entity.{Executor, LabelExecutor, ResourceExecutor}
import com.webank.wedatasphere.linkis.engineconn.executor.listener.ExecutorListenerBusContext
import com.webank.wedatasphere.linkis.engineconn.executor.service.ManagerService
import com.webank.wedatasphere.linkis.manager.common.entity.enumeration.NodeStatus
import com.webank.wedatasphere.linkis.manager.common.protocol.engine.EngineConnReleaseRequest
import com.webank.wedatasphere.linkis.manager.common.protocol.resource.ResourceUsedProtocol
import com.webank.wedatasphere.linkis.rpc.Sender
import org.apache.commons.lang.exception.ExceptionUtils
class AccessibleEngineConnExecution extends EngineConnExecution with Logging {
protected def findReportExecutor(engineCreationContext: EngineCreationContext,
engineConn: EngineConn): Executor =
ExecutorManager.getInstance.getReportExecutor
protected def beforeReportToLinkisManager(executor: Executor,
engineCreationContext: EngineCreationContext,
engineConn: EngineConn): Unit = {}
protected def afterReportToLinkisManager(executor: Executor,
engineCreationContext: EngineCreationContext,
engineConn: EngineConn): Unit = {}
override def execute(engineCreationContext: EngineCreationContext, engineConn: EngineConn): Unit = {
init(engineCreationContext)
val executor = findReportExecutor(engineCreationContext, engineConn)
info(s"Created a report executor ${executor.getClass.getSimpleName}(${executor.getId}).")
beforeReportToLinkisManager(executor, engineCreationContext, engineConn)
reportUsedResource(executor, engineCreationContext)
reportLabel(executor)
executorStatusChecker
afterReportToLinkisManager(executor, engineCreationContext, engineConn)
}
protected def init(engineCreationContext: EngineCreationContext): Unit = {
val listenerBusContext = ExecutorListenerBusContext.getExecutorListenerBusContext()
listenerBusContext.getEngineConnAsyncListenerBus.start()
}
private def executorStatusChecker(): Unit = {
val context = EngineConnObject.getEngineCreationContext
val maxFreeTimeVar = AccessibleExecutorConfiguration.ENGINECONN_MAX_FREE_TIME.getValue(context.getOptions)
val maxFreeTimeStr = maxFreeTimeVar.toString
val maxFreeTime = maxFreeTimeVar.toLong
info("executorStatusChecker created, maxFreeTimeMills is " + maxFreeTime)
Utils.defaultScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryAndWarn {
val accessibleExecutor = ExecutorManager.getInstance.getReportExecutor match {
case executor: AccessibleExecutor => executor
case executor: Executor =>
warn(s"Executor(${executor.getId}) is not a AccessibleExecutor, do noting when reached max free time .")
return
}
if (NodeStatus.isCompleted(accessibleExecutor.getStatus)) {
error(s"${accessibleExecutor.getId} has completed with status ${accessibleExecutor.getStatus}, now stop it.")
ShutdownHook.getShutdownHook.notifyStop()
} else if (accessibleExecutor.getStatus == NodeStatus.ShuttingDown) {
warn(s"${accessibleExecutor.getId} is ShuttingDown...")
ShutdownHook.getShutdownHook.notifyStop()
} else if (maxFreeTime > 0 && (NodeStatus.Unlock.equals(accessibleExecutor.getStatus) || NodeStatus.Idle.equals(accessibleExecutor.getStatus) )
&& System.currentTimeMillis - accessibleExecutor.getLastActivityTime > maxFreeTime) {
warn(s"${accessibleExecutor.getId} has not been used for $maxFreeTimeStr, now try to shutdown it.")
ShutdownHook.getShutdownHook.notifyStop()
requestManagerReleaseExecutor(" idle release")
Utils.defaultScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = {
Utils.tryCatch {
warn(s"Now exit with code ${ShutdownHook.getShutdownHook.getExitCode()}")
System.exit(ShutdownHook.getShutdownHook.getExitCode())
} { t =>
error(s"Exit error : ${ExceptionUtils.getRootCauseMessage(t)}.", t)
System.exit(-1)
}
}
}, 3000,1000*10, TimeUnit.MILLISECONDS)
}
}
}, 3 * 60 * 1000, AccessibleExecutorConfiguration.ENGINECONN_HEARTBEAT_TIME.getValue.toLong, TimeUnit.MILLISECONDS)
}
def requestManagerReleaseExecutor(msg: String): Unit = {
val engineReleaseRequest = new EngineConnReleaseRequest(Sender.getThisServiceInstance, Utils.getJvmUser, msg, EngineConnObject.getEngineCreationContext.getTicketId)
ManagerService.getManagerService.requestReleaseEngineConn(engineReleaseRequest)
}
protected def reportUsedResource(executor: Executor, engineCreationContext: EngineCreationContext): Unit = executor match {
case resourceExecutor: ResourceExecutor =>
ManagerService.getManagerService
.reportUsedResource(ResourceUsedProtocol(Sender.getThisServiceInstance,
resourceExecutor.getCurrentNodeResource(), engineCreationContext.getTicketId))
info("In the first time, report usedResources to LinkisManager succeed.")
case _ =>
info("Do not need to report usedResources.")
}
protected def reportLabel(executor: Executor): Unit = executor match {
case labelExecutor: LabelExecutor =>
ManagerService.getManagerService.labelReport(labelExecutor.getExecutorLabels())
info("In the first time, report all labels to LinkisManager succeed.")
case _ =>
info("Do not need to report labels.")
}
/**
* Accessible should be executed by the first, because it will instance the report executor.
*
* @return
*/
override def getOrder: Int = 10
}