blob: 338f4c9e00082c88554fbf478ec517c329a47efb [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.samza.task
import org.apache.samza.system.OutgoingMessageEnvelope
import org.apache.samza.system.SystemProducers
import org.apache.samza.container.TaskInstanceMetrics
import org.apache.samza.util.Logging
* TaskInstanceCollector is an implementation of MessageCollector that sends
* messages to the underlying SystemProducers class immediately. Since the
* SystemProducers object is typically shared between TaskInstances (in order
* to share connections to the underlying system), this class is necessary as
* a way to fill in the "source" for the underlying SystemProducers. If a
* StreamTask calls collector.send, the messages will be immediately given to
* the SystemProducers, which will in turn forward the outgoing message
* immediately to the underlying producer for the system. Note that, if the
* underlying system producer buffers messages, then using this collector will
* still not result in an immediate send, but calling flush on it should.
class TaskInstanceCollector(
producerMultiplexer: SystemProducers,
metrics: TaskInstanceMetrics = new TaskInstanceMetrics) extends MessageCollector with Logging {
* Register as a new source with SystemProducers. This allows this collector
* to send messages to the SystemProducers.
def register {
debug("Registering source: %s" format metrics.source)
* Sends a message to the underlying SystemProducers.
* @param envelope An outgoing envelope that's to be sent to SystemProducers.
def send(envelope: OutgoingMessageEnvelope) {
trace("Sending message from source: %s, %s" format (metrics.source, envelope))
producerMultiplexer.send(metrics.source, envelope)
* Flushes the underlying SystemProducers.
def flush {
trace("Flushing messages from source: %s" format metrics.source)