| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.container |
| |
| |
| import java.util.{Collections, Objects, Optional} |
| import java.util.concurrent.ScheduledExecutorService |
| |
| import org.apache.samza.SamzaException |
| import org.apache.samza.checkpoint.{Checkpoint, CheckpointId, CheckpointedChangelogOffset, OffsetManager} |
| import org.apache.samza.config.{Config, StreamConfig, TaskConfig} |
| import org.apache.samza.context._ |
| import org.apache.samza.job.model.{JobModel, TaskModel} |
| import org.apache.samza.scheduler.{CallbackSchedulerImpl, EpochTimeScheduler, ScheduledCallback} |
| import org.apache.samza.storage.kv.KeyValueStore |
| import org.apache.samza.storage.TaskStorageManager |
| import org.apache.samza.system._ |
| import org.apache.samza.table.TableManager |
| import org.apache.samza.task._ |
| import org.apache.samza.util.{Logging, ScalaJavaUtil} |
| |
| import scala.collection.JavaConversions._ |
| import scala.collection.JavaConverters._ |
| import scala.collection.{JavaConverters, Map} |
| |
| class TaskInstance( |
| val task: Any, |
| taskModel: TaskModel, |
| val metrics: TaskInstanceMetrics, |
| systemAdmins: SystemAdmins, |
| consumerMultiplexer: SystemConsumers, |
| collector: TaskInstanceCollector, |
| override val offsetManager: OffsetManager = new OffsetManager, |
| storageManager: TaskStorageManager = null, |
| tableManager: TableManager = null, |
| val systemStreamPartitions: java.util.Set[SystemStreamPartition] = Collections.emptySet(), |
| val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler, |
| jobModel: JobModel = null, |
| streamMetadataCache: StreamMetadataCache = null, |
| inputStreamMetadata: Map[SystemStream, SystemStreamMetadata] = Map(), |
| timerExecutor : ScheduledExecutorService = null, |
| jobContext: JobContext, |
| containerContext: ContainerContext, |
| applicationContainerContextOption: Option[ApplicationContainerContext], |
| applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]], |
| externalContextOption: Option[ExternalContext]) extends Logging with RunLoopTask { |
| |
| val taskName: TaskName = taskModel.getTaskName |
| val isInitableTask = task.isInstanceOf[InitableTask] |
| val isEndOfStreamListenerTask = task.isInstanceOf[EndOfStreamListenerTask] |
| val isClosableTask = task.isInstanceOf[ClosableTask] |
| |
| override val isWindowableTask = task.isInstanceOf[WindowableTask] |
| |
| override val epochTimeScheduler: EpochTimeScheduler = EpochTimeScheduler.create(timerExecutor) |
| |
| private val kvStoreSupplier = ScalaJavaUtil.toJavaFunction( |
| (storeName: String) => { |
| if (storageManager != null && storageManager.getStore(storeName).isDefined) { |
| storageManager.getStore(storeName).get.asInstanceOf[KeyValueStore[_, _]] |
| } else { |
| null |
| } |
| }) |
| private val taskContext = new TaskContextImpl(taskModel, metrics.registry, kvStoreSupplier, tableManager, |
| new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel, streamMetadataCache) |
| // need separate field for this instead of using it through Context, since Context throws an exception if it is null |
| private val applicationTaskContextOption = applicationTaskContextFactoryOption |
| .map(_.create(externalContextOption.orNull, jobContext, containerContext, taskContext, |
| applicationContainerContextOption.orNull)) |
| val context = new ContextImpl(jobContext, containerContext, taskContext, |
| Optional.ofNullable(applicationContainerContextOption.orNull), |
| Optional.ofNullable(applicationTaskContextOption.orNull), Optional.ofNullable(externalContextOption.orNull)) |
| |
| // store the (ssp -> if this ssp has caught up) mapping. "caught up" |
| // means the same ssp in other taskInstances have the same offset as |
| // the one here. |
| var ssp2CaughtupMapping: scala.collection.mutable.Map[SystemStreamPartition, Boolean] = |
| scala.collection.mutable.Map[SystemStreamPartition, Boolean]() |
| systemStreamPartitions.foreach(ssp2CaughtupMapping += _ -> false) |
| |
| private val config: Config = jobContext.getConfig |
| |
| val streamConfig: StreamConfig = new StreamConfig(config) |
| override val intermediateStreams: java.util.Set[String] = JavaConverters.setAsJavaSetConverter(streamConfig.getStreamIds.filter(streamConfig.getIsIntermediateStream)).asJava |
| |
| val streamsToDeleteCommittedMessages: Set[String] = streamConfig.getStreamIds.filter(streamConfig.getDeleteCommittedMessages).map(streamConfig.getPhysicalName).toSet |
| |
| def registerOffsets { |
| debug("Registering offsets for taskName: %s" format taskName) |
| offsetManager.register(taskName, systemStreamPartitions) |
| } |
| |
| def startTableManager { |
| if (tableManager != null) { |
| debug("Starting table manager for taskName: %s" format taskName) |
| |
| tableManager.init(context) |
| } else { |
| debug("Skipping table manager initialization for taskName: %s" format taskName) |
| } |
| } |
| |
| def initTask { |
| initCaughtUpMapping() |
| |
| val taskConfig = new TaskConfig(config) |
| if (taskConfig.getTransactionalStateRestoreEnabled() && taskConfig.getCommitMs > 0) { |
| // Commit immediately so the trimmed changelog messages |
| // will be sealed in a checkpoint |
| commit |
| } |
| |
| if (isInitableTask) { |
| debug("Initializing task for taskName: %s" format taskName) |
| |
| task.asInstanceOf[InitableTask].init(context) |
| } else { |
| debug("Skipping task initialization for taskName: %s" format taskName) |
| } |
| applicationTaskContextOption.foreach(applicationTaskContext => { |
| debug("Starting application-defined task context for taskName: %s" format taskName) |
| applicationTaskContext.start() |
| }) |
| } |
| |
| def registerProducers { |
| debug("Registering producers for taskName: %s" format taskName) |
| |
| collector.register |
| } |
| |
| /** |
| * Computes the starting offset for the partitions assigned to the task and registers them with the underlying {@see SystemConsumers}. |
| * |
| * Starting offset for a partition of the task is computed in the following manner: |
| * |
| * 1. If a startpoint exists for a task, system stream partition and it resolves to a offset, then the resolved offset is used as the starting offset. |
| * 2. Else, the checkpointed offset for the system stream partition is used as the starting offset. |
| */ |
| def registerConsumers() { |
| debug("Registering consumers for taskName: %s" format taskName) |
| systemStreamPartitions.foreach(systemStreamPartition => { |
| val startingOffset: String = getStartingOffset(systemStreamPartition) |
| consumerMultiplexer.register(systemStreamPartition, startingOffset) |
| metrics.addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).orNull) |
| }) |
| } |
| |
| def process(envelope: IncomingMessageEnvelope, coordinator: ReadableCoordinator, |
| callbackFactory: TaskCallbackFactory) { |
| metrics.processes.inc |
| |
| val incomingMessageSsp = envelope.getSystemStreamPartition |
| |
| if (!ssp2CaughtupMapping.getOrElse(incomingMessageSsp, |
| throw new SamzaException(incomingMessageSsp + " is not registered!"))) { |
| checkCaughtUp(envelope) |
| } |
| |
| if (ssp2CaughtupMapping(incomingMessageSsp)) { |
| metrics.messagesActuallyProcessed.inc |
| |
| trace("Processing incoming message envelope for taskName and SSP: %s, %s" |
| format (taskName, incomingMessageSsp)) |
| |
| exceptionHandler.maybeHandle { |
| val callback = callbackFactory.createCallback() |
| task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector, coordinator, callback) |
| } |
| } |
| } |
| |
| def endOfStream(coordinator: ReadableCoordinator): Unit = { |
| if (isEndOfStreamListenerTask) { |
| exceptionHandler.maybeHandle { |
| task.asInstanceOf[EndOfStreamListenerTask].onEndOfStream(collector, coordinator) |
| } |
| } |
| } |
| |
| def window(coordinator: ReadableCoordinator) { |
| if (isWindowableTask) { |
| trace("Windowing for taskName: %s" format taskName) |
| |
| metrics.windows.inc |
| |
| exceptionHandler.maybeHandle { |
| task.asInstanceOf[WindowableTask].window(collector, coordinator) |
| } |
| } |
| } |
| |
| def scheduler(coordinator: ReadableCoordinator) { |
| trace("Scheduler for taskName: %s" format taskName) |
| |
| exceptionHandler.maybeHandle { |
| epochTimeScheduler.removeReadyTimers().entrySet().foreach { entry => |
| entry.getValue.asInstanceOf[ScheduledCallback[Any]].onCallback(entry.getKey.getKey, collector, coordinator) |
| } |
| } |
| } |
| |
| def commit { |
| metrics.commits.inc |
| |
| val allCheckpointOffsets = new java.util.HashMap[SystemStreamPartition, String]() |
| val inputCheckpoint = offsetManager.buildCheckpoint(taskName) |
| if (inputCheckpoint != null) { |
| trace("Got input offsets for taskName: %s as: %s" format(taskName, inputCheckpoint.getOffsets)) |
| allCheckpointOffsets.putAll(inputCheckpoint.getOffsets) |
| } |
| |
| trace("Flushing producers for taskName: %s" format taskName) |
| collector.flush |
| |
| if (tableManager != null) { |
| trace("Flushing tables for taskName: %s" format taskName) |
| tableManager.flush() |
| } |
| |
| var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null |
| if (storageManager != null) { |
| trace("Flushing state stores for taskName: %s" format taskName) |
| newestChangelogOffsets = storageManager.flush() |
| trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets)) |
| } |
| |
| val checkpointId = CheckpointId.create() |
| if (storageManager != null && newestChangelogOffsets != null) { |
| trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId)) |
| storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap) |
| } |
| |
| if (newestChangelogOffsets != null) { |
| newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) => |
| val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString |
| allCheckpointOffsets.put(ssp, offset) |
| } |
| } |
| val checkpoint = new Checkpoint(allCheckpointOffsets) |
| trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets)) |
| |
| offsetManager.writeCheckpoint(taskName, checkpoint) |
| |
| if (storageManager != null) { |
| trace("Remove old checkpoint stores for taskName: %s" format taskName) |
| try { |
| storageManager.removeOldCheckpoints(checkpointId) |
| } catch { |
| case e: Exception => error("Failed to remove old checkpoints for task: %s. Current checkpointId: %s" format (taskName, checkpointId), e) |
| } |
| } |
| |
| if (inputCheckpoint != null) { |
| trace("Deleting committed input offsets for taskName: %s" format taskName) |
| inputCheckpoint.getOffsets.asScala |
| .filter { case (ssp, _) => streamsToDeleteCommittedMessages.contains(ssp.getStream) } // Only delete data of intermediate streams |
| .groupBy { case (ssp, _) => ssp.getSystem } |
| .foreach { case (systemName: String, offsets: Map[SystemStreamPartition, String]) => |
| systemAdmins.getSystemAdmin(systemName).deleteMessages(offsets.asJava) |
| } |
| } |
| } |
| |
| def shutdownTask { |
| applicationTaskContextOption.foreach(applicationTaskContext => { |
| debug("Stopping application-defined task context for taskName: %s" format taskName) |
| applicationTaskContext.stop() |
| }) |
| if (task.isInstanceOf[ClosableTask]) { |
| debug("Shutting down stream task for taskName: %s" format taskName) |
| |
| task.asInstanceOf[ClosableTask].close |
| } else { |
| debug("Skipping stream task shutdown for taskName: %s" format taskName) |
| } |
| } |
| |
| def shutdownTableManager { |
| if (tableManager != null) { |
| debug("Shutting down table manager for taskName: %s" format taskName) |
| |
| tableManager.close |
| } else { |
| debug("Skipping table manager shutdown for taskName: %s" format taskName) |
| } |
| } |
| |
| override def toString() = "TaskInstance for class %s and taskName %s." format (task.getClass.getName, taskName) |
| |
| def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, closable=%s endofstreamlistener=%s]" format |
| (taskName, isWindowableTask, isClosableTask, isEndOfStreamListenerTask) |
| |
| /** |
| * From the envelope, check if this SSP has caught up with the starting offset of the SSP |
| * in this TaskInstance. If the offsets are not comparable, default to true, which means |
| * it's already caught up. |
| */ |
| private def checkCaughtUp(envelope: IncomingMessageEnvelope) = { |
| val incomingMessageSsp = envelope.getSystemStreamPartition |
| |
| if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(envelope.getOffset)) { |
| ssp2CaughtupMapping(incomingMessageSsp) = true |
| } else { |
| systemAdmins match { |
| case null => { |
| warn("systemAdmin is null. Set all SystemStreamPartitions to caught-up") |
| ssp2CaughtupMapping(incomingMessageSsp) = true |
| } |
| case others => { |
| val startingOffset = getStartingOffset(incomingMessageSsp) |
| |
| val system = incomingMessageSsp.getSystem |
| others.getSystemAdmin(system).offsetComparator(envelope.getOffset, startingOffset) match { |
| case null => { |
| info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to caught-up") |
| ssp2CaughtupMapping(incomingMessageSsp) = true // not comparable |
| } |
| case result => { |
| if (result >= 0) { |
| info(incomingMessageSsp.toString + " has caught up.") |
| ssp2CaughtupMapping(incomingMessageSsp) = true |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Check each partition assigned to the task is caught to the last offset |
| */ |
| def initCaughtUpMapping() { |
| if (inputStreamMetadata != null && inputStreamMetadata.nonEmpty) { |
| systemStreamPartitions.foreach(ssp => { |
| if (inputStreamMetadata.contains(ssp.getSystemStream)) { |
| val partitionMetadata = inputStreamMetadata(ssp.getSystemStream) |
| .getSystemStreamPartitionMetadata.get(ssp.getPartition) |
| |
| val upcomingOffset = partitionMetadata.getUpcomingOffset |
| val startingOffset = offsetManager.getStartingOffset(taskName, ssp) |
| .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format ssp)) |
| |
| // Mark ssp to be caught up if the starting offset is already the |
| // upcoming offset, meaning the task has consumed all the messages |
| // in this partition before and waiting for the future incoming messages. |
| if(Objects.equals(upcomingOffset, startingOffset)) { |
| ssp2CaughtupMapping(ssp) = true |
| } |
| } |
| }) |
| } |
| } |
| |
| private def getStartingOffset(systemStreamPartition: SystemStreamPartition) = { |
| val offset = offsetManager.getStartingOffset(taskName, systemStreamPartition) |
| val startingOffset = offset.getOrElse( |
| throw new SamzaException("No offset defined for SystemStreamPartition: %s" format systemStreamPartition)) |
| |
| startingOffset |
| } |
| } |