blob: a8fe2b8bb785a68b22c7564512f545bf0a5cbd57 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.chooser;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
/**
* MessageChooser is an interface for programmatic fine-grain control over
* stream consumption.
*
* <p>Consider the case of a Samza task consuming multiple streams, where some
* streams may be from live systems that have stricter SLA requirements and must
* always be prioritized over other streams that may be from batch systems.
* MessageChooser allows developers to inject message prioritization logic into
* the SamzaContainer.
*
* <p>In general, the MessageChooser can be used to prioritize certain systems,
* streams or partitions over others. It can also be used to throttle certain
* partitions, by choosing not to return messages even though they are
* available. The MessageChooser can also throttle the entire SamzaContainer by
* performing a blocking operation, such as Thread.sleep.
*
* <p>The manner in which MessageChooser is used is:
*
* <ul>
* <li>SystemConsumers buffer messages from all SystemStreamPartitions as they
* become available.</li>
* <li>If MessageChooser has no messages for a given SystemStreamPartition, and
* a SystemConsumer has a message in its buffer for the SystemStreamPartition,
* the MessageChooser will be updated once with the next message in the buffer.</li>
* <li>When SamzaContainer is ready to process another message, it calls
* SystemConsumers.choose, which in turn calls {@link MessageChooser#choose}.</li>
* </ul>
*
* <p>Since the MessageChooser only receives one message at a time per
* {@link SystemStreamPartition}, it can be used to order messages between different
* SystemStreamPartitions, but it can't be used to re-order messages within a
* single SystemStreamPartition (a buffered sort). This must be done within a
* StreamTask.
*
* <p>The contract between the MessageChooser and the SystemConsumers is:
*
* <ul>
* <li>{@link #update(IncomingMessageEnvelope)} can be called multiple times
* before {@link #choose()} is called.</li>
* <li>If {@link #choose()} returns null, that means no envelopes should be
* processed at the moment.</li>
* <li>A MessageChooser may elect to return null when {@link #choose()} is
* called, even if unprocessed messages have been given by the update method.</li>
* <li>A MessageChooser will not have any of its in-memory state restored in the
* event of a failure.</li>
* <li>Blocking operations (such as Thread.sleep) will block all processing in
* the entire SamzaContainer.</li>
* <li>A MessageChooser should never return the same envelope more than once.</li>
* <li>Non-deterministic (e.g. time-based) MessageChoosers are allowed.</li>
* <li>A MessageChooser does not need to be thread-safe.</li>
* </ul>
*/
public interface MessageChooser {
/**
* Called after all SystemStreamPartitions have been registered. Start is used
* to notify the chooser that it will start receiving update and choose calls.
*/
void start();
/**
* Called when the chooser is about to be discarded. No more messages will be
* given to the chooser after it is stopped.
*/
void stop();
/**
* Called before start, to let the chooser know that it will be handling
* envelopes from the given SystemStreamPartition. Register will only be
* called before start.
*
* @param systemStreamPartition
* A SystemStreamPartition that envelopes will be coming from.
* @param offset
* The offset of the first message expected for the
* system/stream/partition that's being registered. If "7" were
* supplied as the offset, then the MessageChooser can expect the
* first message it is updated with for the system/stream/partition
* will have an offset of "7".
*/
void register(SystemStreamPartition systemStreamPartition, String offset);
/**
* Notify the chooser that a new envelope is available for a processing. A
* MessageChooser will receive, at most, one outstanding envelope per
* system/stream/partition combination. For example, if update is called for
* partition 7 of kafka.mystream, then update will not be called with an
* envelope from partition 7 of kafka.mystream until the previous envelope has
* been returned via the choose method. Update will only be invoked after the
* chooser has been started.
*
* @param envelope
* An unprocessed envelope.
*/
void update(IncomingMessageEnvelope envelope);
/**
* The choose method is invoked when the SamzaContainer is ready to process a
* new message. The chooser may elect to return any envelope that it's been
* given via the update method, which hasn't yet been returned. Choose will
* only be called after the chooser has been started.
*
* @return The next envelope to process, or null if the chooser has no
* messages or doesn't want to process any at the moment.
*/
IncomingMessageEnvelope choose();
}