blob: dcb869398bca602a6cf76688cd16c61321923720 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package org.apache.toree.utils
import scala.language.existentials
import java.util.concurrent._
import java.util.UUID
import com.google.common.util.concurrent.ThreadFactoryBuilder
import ScheduledTaskManager._
import scala.util.Try
/**
* Constructs timing-based events that are periodically executed. Does not
* support hard-killing tasks that are not interruptable.
* @param totalThreads The total threads to create for the underlying thread
* pool
* @param defaultExecutionDelay The default delay to use before all added tasks
* @param defaultTimeInterval The default time interval between tasks in
* milliseconds
*/
class ScheduledTaskManager(
private val totalThreads: Int = DefaultMaxThreads,
private val defaultExecutionDelay: Long = DefaultExecutionDelay,
private val defaultTimeInterval: Long = DefaultTimeInterval
) {
private[utils] val _scheduler = new ScheduledThreadPoolExecutor(
totalThreads, new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("scheduled-task-manager-%d")
.build())
_scheduler.setRemoveOnCancelPolicy(true)
private val _taskMap = new ConcurrentHashMap[String, ScheduledFuture[_]]()
/**
* Adds the specified task to the queued list to execute at the specified
* time interval.
* @param executionDelay The time delay (in milliseconds) before starting
* @param timeInterval The time interval (in milliseconds)
* @param task The task to execute
* @tparam T The type of return result (currently ignored)
* @return The id of the task
*/
def addTask[T](
executionDelay: Long = defaultExecutionDelay,
timeInterval: Long = defaultTimeInterval,
task: => T
) = {
val taskId = UUID.randomUUID().toString
val runnable: Runnable = new Runnable {
override def run(): Unit = Try(task)
}
// Schedule our task at the desired interval
_taskMap.put(taskId, _scheduler.scheduleAtFixedRate(
runnable, executionDelay, timeInterval, TimeUnit.MILLISECONDS))
taskId
}
/**
* Removes the specified task from the manager.
* @param taskId The id of the task to remove
* @return True if the task was removed, otherwise false
*/
def removeTask(taskId: String): Boolean = {
// Exit if the task with the given id does not exist
if (taskId == null || !_taskMap.containsKey(taskId)) return false
val future = _taskMap.remove(taskId)
// Stop the future, but allow the current task to finish
future.cancel(false)
true
}
/**
* Shuts down the thread pool used for task execution.
*/
def stop() = {
_taskMap.clear()
_scheduler.shutdown()
}
}
object ScheduledTaskManager {
val DefaultMaxThreads = 4
val DefaultExecutionDelay = 10 // 10 milliseconds
val DefaultTimeInterval = 100 // 100 milliseconds
}