blob: 338f4c9e00082c88554fbf478ec517c329a47efb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.task
import org.apache.samza.system.OutgoingMessageEnvelope
import org.apache.samza.system.SystemProducers
import org.apache.samza.container.TaskInstanceMetrics
import org.apache.samza.util.Logging
/**
* TaskInstanceCollector is an implementation of MessageCollector that sends
* messages to the underlying SystemProducers class immediately. Since the
* SystemProducers object is typically shared between TaskInstances (in order
* to share connections to the underlying system), this class is necessary as
* a way to fill in the "source" for the underlying SystemProducers. If a
* StreamTask calls collector.send, the messages will be immediately given to
* the SystemProducers, which will in turn forward the outgoing message
* immediately to the underlying producer for the system. Note that, if the
* underlying system producer buffers messages, then using this collector will
* still not result in an immediate send, but calling flush on it should.
*/
class TaskInstanceCollector(
producerMultiplexer: SystemProducers,
metrics: TaskInstanceMetrics = new TaskInstanceMetrics) extends MessageCollector with Logging {
/**
* Register as a new source with SystemProducers. This allows this collector
* to send messages to the SystemProducers.
*/
def register {
debug("Registering source: %s" format metrics.source)
producerMultiplexer.register(metrics.source)
}
/**
* Sends a message to the underlying SystemProducers.
*
* @param envelope An outgoing envelope that's to be sent to SystemProducers.
*/
def send(envelope: OutgoingMessageEnvelope) {
trace("Sending message from source: %s, %s" format (metrics.source, envelope))
metrics.sends.inc
producerMultiplexer.send(metrics.source, envelope)
}
/**
* Flushes the underlying SystemProducers.
*/
def flush {
trace("Flushing messages from source: %s" format metrics.source)
metrics.flushes.inc
producerMultiplexer.flush(metrics.source)
}
}