blob: 5b174bdb9f6e4a0baa06c84e9e0635c12b7e2364 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.task
import java.time.Instant
import scala.concurrent.duration.FiniteDuration
import akka.actor.Actor.Receive
import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
import org.slf4j.Logger
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.util.LogUtil
import org.apache.gearpump.{Message, TimeStamp}
/**
* This provides context information for a task.
*/
trait TaskContext {
def taskId: TaskId
def executorId: Int
def appId: Int
def appName: String
/**
* The actorRef of AppMaster
* @return application master's actor reference
*/
def appMaster: ActorRef
/**
* The task parallelism
*
* For example, we can create 3 source tasks, and 3 sink tasks,
* the task parallelism is 3 for each.
*
* This can be useful when reading from partitioned data source.
* For example, for kafka, there may be 10 partitions, if we have
* parallelism of 2 for this task, then each task will be responsible
* to read data from 5 partitions.
*
* @return the parallelism level
*/
def parallelism: Int
/**
* Please don't use this if possible.
* @return self actor ref
*/
// TODO: We should remove the self from TaskContext
def self: ActorRef
/**
* Please don't use this if possible
* @return the actor system
*/
// TODO: we should remove this in future
def system: ActorSystem
/**
* This can be used to output messages to downstream tasks. The data shuffling rule
* can be decided by Partitioner.
*
* @param msg message to output
*/
def output(msg: Message): Unit
def actorOf(props: Props): ActorRef
def actorOf(props: Props, name: String): ActorRef
def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit): Cancellable
/**
* akka.actor.ActorRefProvider.scheduleOnce
*
* @param initialDelay the initial delay
* @param f the function to execute after initial delay
* @return the executable
*/
def scheduleOnce(initialDelay: FiniteDuration)(f: => Unit): Cancellable
/**
* For managed message(type of Message), the sender only serve as a unique Id,
* It's address is not something meaningful, you should not use this directly
*
* For unmanaged message, the sender represent the sender ActorRef
* @return sender
*/
def sender: ActorRef
/**
* Retrieves upstream min clock from TaskActor
*
* @return the min clock
*/
def upstreamMinClock: TimeStamp
/**
* Logger is environment dependant, it should be provided by
* containing environment.
*/
def logger: Logger
}
/**
* Streaming Task interface
*/
trait TaskInterface {
/**
* Method called with the task is initialized.
* @param startTime startTime that can be used to decide from when a source producer task should
* replay the data source, or from when a processor task should recover its
* checkpoint data in to in-memory state.
*/
def onStart(startTime: Instant): Unit
/**
* Method called for each message received.
*
* @param msg Message send by upstream tasks
*/
def onNext(msg: Message): Unit
/**
* Method called when task is under clean up.
*
* This can be used to cleanup resource when the application finished.
*/
def onStop(): Unit
/**
* Handlers unmanaged messages
*
* @return the handler
*/
def receiveUnManagedMessage: Receive = null
/**
* Method called on watermark update.
* Usually safe to output or checkpoint states earlier than watermark.
*
* @param watermark represents event time progress.
*/
def onWatermarkProgress(watermark: Instant): Unit
}
abstract class Task(taskContext: TaskContext, userConf: UserConfig) extends TaskInterface {
import taskContext.{appId, executorId, taskId}
val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId)
protected implicit val system = taskContext.system
implicit val self = taskContext.self
/**
* For managed message(type of Message), the sender mean nothing,
* you should not use this directory
*
* For unmanaged message, the sender represent the sender actor
* @return the sender
*/
protected def sender: ActorRef = taskContext.sender
def onStart(startTime: Instant): Unit = {}
def onNext(msg: Message): Unit = {}
def onStop(): Unit = {}
override def receiveUnManagedMessage: Receive = {
case msg =>
LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString)
}
override def onWatermarkProgress(watermark: Instant): Unit = {}
}