blob: 581b08a5e737335ab1a7447f38ad93542d47cc1f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.linkis.engineconn.acessible.executor.execution
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration
import org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
import org.apache.linkis.engineconn.common.engineconn.EngineConn
import org.apache.linkis.engineconn.common.execution.EngineConnExecution
import org.apache.linkis.engineconn.core.EngineConnObject
import org.apache.linkis.engineconn.core.executor.ExecutorManager
import org.apache.linkis.engineconn.core.hook.ShutdownHook
import org.apache.linkis.engineconn.executor.entity.{Executor, LabelExecutor, ResourceExecutor}
import org.apache.linkis.engineconn.executor.listener.ExecutorListenerBusContext
import org.apache.linkis.engineconn.executor.service.ManagerService
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.common.protocol.engine.{ECCanKillRequest, EngineConnReleaseRequest}
import org.apache.linkis.manager.common.protocol.resource.ResourceUsedProtocol
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.rpc.Sender
import java.util.concurrent.TimeUnit
class AccessibleEngineConnExecution extends EngineConnExecution with Logging {
protected def findReportExecutor(
engineCreationContext: EngineCreationContext,
engineConn: EngineConn
): Executor =
ExecutorManager.getInstance.getReportExecutor
protected def beforeReportToLinkisManager(
executor: Executor,
engineCreationContext: EngineCreationContext,
engineConn: EngineConn
): Unit = {}
protected def afterReportToLinkisManager(
executor: Executor,
engineCreationContext: EngineCreationContext,
engineConn: EngineConn
): Unit = {}
override def execute(
engineCreationContext: EngineCreationContext,
engineConn: EngineConn
): Unit = {
init(engineCreationContext)
val executor = findReportExecutor(engineCreationContext, engineConn)
logger.info(s"Created a report executor ${executor.getClass.getSimpleName}(${executor.getId}).")
beforeReportToLinkisManager(executor, engineCreationContext, engineConn)
reportUsedResource(executor, engineCreationContext)
reportLabel(executor)
executorStatusChecker
afterReportToLinkisManager(executor, engineCreationContext, engineConn)
}
protected def init(engineCreationContext: EngineCreationContext): Unit = {
val listenerBusContext = ExecutorListenerBusContext.getExecutorListenerBusContext()
listenerBusContext.getEngineConnAsyncListenerBus.start()
}
private def executorStatusChecker(): Unit = {
val context = EngineConnObject.getEngineCreationContext
val maxFreeTimeVar =
AccessibleExecutorConfiguration.ENGINECONN_MAX_FREE_TIME.getValue(context.getOptions)
val maxFreeTimeStr = maxFreeTimeVar.toString
val maxFreeTime = maxFreeTimeVar.toLong
logger.info("executorStatusChecker created, maxFreeTimeMills is " + maxFreeTime)
Utils.defaultScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = Utils.tryAndWarn {
val accessibleExecutor = ExecutorManager.getInstance.getReportExecutor match {
case executor: AccessibleExecutor => executor
case executor: Executor =>
logger.warn(
s"Executor(${executor.getId}) is not a AccessibleExecutor, do noting when reached max free time ."
)
return
}
val nodeStatus = accessibleExecutor.getStatus
if (NodeStatus.isCompleted(accessibleExecutor.getStatus)) {
logger.error(
s"${accessibleExecutor.getId} has completed with status ${accessibleExecutor.getStatus}, now stop it."
)
requestManagerReleaseExecutor("Completed release", nodeStatus)
ShutdownHook.getShutdownHook.notifyStop()
} else if (accessibleExecutor.getStatus == NodeStatus.ShuttingDown) {
logger.warn(s"${accessibleExecutor.getId} is ShuttingDown...")
requestManagerReleaseExecutor(" ShuttingDown release", nodeStatus)
ShutdownHook.getShutdownHook.notifyStop()
} else if (
maxFreeTime > 0 && (NodeStatus.Unlock.equals(
accessibleExecutor.getStatus
) || NodeStatus.Idle.equals(accessibleExecutor.getStatus))
&& System.currentTimeMillis - accessibleExecutor.getLastActivityTime > maxFreeTime
) {
if (ifECCanMaintain()) {
logger.info("ec will not be killed at this time")
accessibleExecutor.updateLastActivityTime()
} else {
logger.warn(
s"${accessibleExecutor.getId} has not been used for $maxFreeTimeStr, now try to shutdown it."
)
accessibleExecutor.tryShutdown()
requestManagerReleaseExecutor(" idle release", nodeStatus)
ShutdownHook.getShutdownHook.notifyStop()
}
}
}
},
3 * 60 * 1000,
AccessibleExecutorConfiguration.ENGINECONN_STATUS_SCAN_TIME.getValue.toLong,
TimeUnit.MILLISECONDS
)
}
def requestManagerReleaseExecutor(msg: String, nodeStatus: NodeStatus): Unit = {
val engineReleaseRequest = new EngineConnReleaseRequest(
Sender.getThisServiceInstance,
Utils.getJvmUser,
msg,
EngineConnObject.getEngineCreationContext.getTicketId
)
engineReleaseRequest.setNodeStatus(nodeStatus)
logger.info("To send release request to linkis manager")
ManagerService.getManagerService.requestReleaseEngineConn(engineReleaseRequest)
}
private def isMaintainSupported(): Boolean = {
if (!AccessibleExecutorConfiguration.ENABLE_MAINTAIN.getValue) return false
val userCreator =
LabelUtil.getUserCreatorLabel(EngineConnObject.getEngineCreationContext.getLabels())
if (
null != userCreator && AccessibleExecutorConfiguration.ENABLE_MAINTAIN_CREATORS.getValue
.contains(userCreator.getCreator)
) {
logger.info(s"${userCreator.getStringValue} maintain enabled")
true
} else {
false
}
}
private def ifECCanMaintain(): Boolean = {
if (!isMaintainSupported()) return false
val engineTypeLabel =
LabelUtil.getEngineTypeLabel(EngineConnObject.getEngineCreationContext.getLabels())
val userCreator =
LabelUtil.getUserCreatorLabel(EngineConnObject.getEngineCreationContext.getLabels())
if (null == engineTypeLabel) return false
val ecCanKillRequest = new ECCanKillRequest
ecCanKillRequest.setEngineConnInstance(Sender.getThisServiceInstance)
ecCanKillRequest.setUser(userCreator.getUser)
ecCanKillRequest.setUserCreatorLabel(userCreator)
ecCanKillRequest.setEngineTypeLabel(engineTypeLabel)
Utils.tryCatch {
val response = ManagerService.getManagerService.ecCanKillRequest(ecCanKillRequest)
logger.info(s"From manager get $response")
!response.getFlag
} { throwable: Throwable =>
logger.warn("Failed to ecCanKillRequestManager, will be default exit", throwable)
false
}
}
protected def reportUsedResource(
executor: Executor,
engineCreationContext: EngineCreationContext
): Unit = executor match {
case resourceExecutor: ResourceExecutor =>
ManagerService.getManagerService
.reportUsedResource(
ResourceUsedProtocol(
Sender.getThisServiceInstance,
resourceExecutor.getCurrentNodeResource(),
engineCreationContext.getTicketId
)
)
logger.info("In the first time, report usedResources to LinkisManager succeed.")
case _ =>
logger.info("Do not need to report usedResources.")
}
protected def reportLabel(executor: Executor): Unit = executor match {
case labelExecutor: LabelExecutor =>
ManagerService.getManagerService.labelReport(labelExecutor.getExecutorLabels())
logger.info("In the first time, report all labels to LinkisManager succeed.")
case _ =>
logger.info("Do not need to report labels.")
}
/**
* Accessible should be executed by the first, because it will instance the report executor.
*
* @return
*/
override def getOrder: Int = 10
}