Samza's powerful Low Level Task API lets you write your application in terms of processing logic for each incoming message. When using the Low Level Task API, you implement a TaskApplication. The processing logic is defined as either a StreamTask or an AsyncStreamTask.
The Hello Samza Wikipedia applications demonstrate how to use Samza's Low Level Task API. These applications consume various events from Wikipedia, transform them, and calculates several statistics about them.
The WikipediaFeedTaskApplication demonstrates how to consume multiple Wikipedia event streams and merge them to an Apache Kafka topic.
The WikipediaParserTaskApplication demonstrates how to project the incoming events from the Apache Kafka topic to a custom JSON data type.
The WikipediaStatsTaskApplication demonstrates how to calculate and emit periodic statistics about the incoming events while using a local KV store for durability.
A TaskApplication describes the inputs, outputs, state, configuration and the processing logic for an application written using Samza's Low Level Task API.
A typical TaskApplication implementation consists of the following stages:
The following example TaskApplication removes page views with “bad URLs” from the input stream:
{% highlight java %}
public class PageViewFilter implements TaskApplication { @Override public void describe(TaskApplicationDescriptor appDescriptor) { // Step 1: configure the inputs and outputs using descriptors KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka") .withConsumerZkConnect(ImmutableList.of("...")) .withProducerBootstrapServers(ImmutableList.of("...", "...")); KafkaInputDescriptor<PageViewEvent> kid = ksd.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)); KafkaOutputDescriptor<PageViewEvent>> kod = ksd.getOutputDescriptor("goodPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class))); RocksDbTableDescriptor badUrls = new RocksDbTableDescriptor(“badUrls”, KVSerde.of(new StringSerde(), new IntegerSerde()); // Step 2: Add input, output streams and tables appDescriptor .withInputStream(kid) .withOutputStream(kod) .withTable(badUrls) // Step 3: define the processing logic appDescriptor.withTaskFactory(new PageViewFilterTaskFactory()); } }
{% endhighlight %}
Your TaskFactory will be used to create instances of your Task in each of Samza‘s processors. If you’re implementing a StreamTask, you can provide a StreamTaskFactory. Similarly, if you're implementing an AsyncStreamTask, you can provide an AsyncStreamTaskFactory. For example:
{% highlight java %}
public class PageViewFilterTaskFactory implements StreamTaskFactory { @Override public StreamTask createInstance() { return new PageViewFilterTask(); } }
{% endhighlight %}
Your processing logic can be implemented in a StreamTask or an AsyncStreamTask.
You can implement a StreamTask for synchronous message processing. Samza delivers messages to the task one at a time, and considers each message to be processed when the process method call returns. For example:
{% highlight java %}
public class PageViewFilterTask implements StreamTask { @Override public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { // process the message in the envelope synchronously } }
{% endhighlight %}
Note that synchronous message processing does not imply sequential execution. Multiple instances of your Task class implementation may still run concurrently within a container.
You can implement a AsyncStreamTask for asynchronous message processing. This can be useful when you need to perform long running I/O operations to process a message, e.g., making an http request. For example:
{% highlight java %}
public class AsyncPageViewFilterTask implements AsyncStreamTask { @Override public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback) { // process message asynchronously // invoke callback.complete or callback.failure upon completion } }
{% endhighlight %}
Samza delivers the incoming message and a TaskCallback with the processAsync() method call, and considers each message to be processed when its corresponding callback.complete() or callback.failure() has been invoked. If callback.failure() is invoked, or neither callback.complete() or callback.failure() is invoked within task.callback.ms milliseconds, Samza will shut down the running Container.
If configured, Samza will keep up to task.max.concurrency number of messages processing asynchronously at a time within each Task Instance. Note that while message delivery (i.e., processAsync invocation) is guaranteed to be in-order within a stream partition, message processing may complete out of order when setting task.max.concurrency > 1.
For more details on asynchronous and concurrent processing, see the Samza Async API and Multithreading User Guide.
There are a few other interfaces you can implement in your StreamTask or AsyncStreamTask that provide additional functionality.
You can implement the InitableTask interface to access the Context. Context provides access to any runtime objects you need in your task, whether they're provided by the framework, or your own.
{% highlight java %}
public interface InitableTask { void init(Context context) throws Exception; }
{% endhighlight %}
You can implement the ClosableTask to clean up any runtime state during shutdown. This interface is deprecated. It's recommended to use the ApplicationContainerContext and ApplicationTaskContext APIs to manage the lifecycle of any runtime objects.
{% highlight java %}
public interface ClosableTask { void close() throws Exception; }
{% endhighlight %}
You can implement the WindowableTask interface to implement processing logic that is invoked periodically by the framework.
{% highlight java %}
public interface WindowableTask { void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception; }
{% endhighlight %}
You can implement the EndOfStreamListenerTask interface to implement processing logic that is invoked when a Task Instance has reached the end of all input SystemStreamPartitions it's consuming. This is typically relevant when running Samza as a batch job.
{% highlight java %}
public interface EndOfStreamListenerTask { void onEndOfStream(MessageCollector collector, TaskCoordinator coordinator) throws Exception; }
{% endhighlight %}
Samza calls your Task instance's process or processAsync method with each incoming message on your input streams. The IncomingMessageEnvelope can be used to obtain the following information: the de-serialized key, the de-serialized message, and the SystemStreamPartition that the message came from.
The key and message objects need to be cast to the correct type in your Task implementation based on the Serde provided for the InputDescriptor for the input stream.
The SystemStreamPartition object tells you where the message came from. It consists of three parts:
If you have several input streams for your TaskApplication, you can use the SystemStreamPartition to determine what kind of message you’ve received.
To send a message to a stream, you first create an OutgoingMessageEnvelope. At a minimum, you need to provide the message you want to send, and the system and stream to send it to. Optionally you can specify the partitioning key and other parameters. See the javadoc for details.
You can then send the OutgoingMessageEnvelope using the MessageCollector provided with the process() or processAsync() call. You must use the MessageCollector delivered for the message you're currently processing. Holding on to a MessageCollector and reusing it later will cause your messages to not be sent correctly.
{% highlight java %}
/** When a task wishes to send a message, it uses this interface. */ public interface MessageCollector { void send(OutgoingMessageEnvelope envelope); }
{% endhighlight %}
A Table is an abstraction for data sources that support random access by key. It is an evolution of the older KeyValueStore API. It offers support for both local and remote data sources and composition through hybrid tables. For remote data sources, a [RemoteTable] provides optimized access with caching, rate-limiting, and retry support. Depending on the implementation, a Table can be a ReadableTable or a ReadWriteTable.
In the Low Level API, you can obtain and use a Table as follows:
For example:
{% highlight java %}
public class PageViewFilterTask implements StreamTask, InitableTask { private final SystemStream outputStream = new SystemStream(“kafka”, “goodPageViewEvent”); private ReadWriteTable<String, Integer> badUrlsTable; @Override public void init(Context context) { badUrlsTable = (ReadWriteTable<String, Integer>) context.getTaskContext().getTable("badUrls"); } @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { String key = (String) message.getKey(); if (badUrlsTable.containsKey(key)) { // skip the message, increment the counter, do not send it badPageUrlTable.put(key, badPageUrlTable.get(key) + 1); } else { collector.send(new OutgoingMessageEnvelope(outputStream, key, message.getValue())); } } }
{% endhighlight %}
For populating a local Table with secondary data sources, we can use side inputs to specify the source stream. Additionally, the table descriptor also takes in a SideInputsProcessor
that will be applied before writing the entries to the table. The TableDescriptor
that is registered with the TaskApplicationDescriptor
can be used to specify side input properties.
The following code snippet shows a sample TableDescriptor
for a local table that is backed by side inputs.
{% highlight java %}
RocksDbTableDescriptor<String, Profile> tableDesc = new RocksDbTableDescriptor(“viewCounts”, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(Profile.class))) .withSideInput(ImmutableList.of("profile")) .withSideInputsProcessor((message, store) -> { ... });
{% endhighlight %}
For legacy Low Level API applications, you can continue specifying your system, stream and store properties along with your task.class in configuration. An incomplete example of configuration for legacy task application looks like this (see the configuration documentation for more detail):
{% highlight jproperties %}
# This is the Task class that Samza will instantiate when the job is run task.class=com.example.samza.PageViewFilterTask # Define a system called "kafka" (you can give it any name, and you can define # multiple systems if you want to process messages from different sources) systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory # The job consumes a topic called "PageViewEvent" from the "kafka" system task.inputs=kafka.PageViewEvent # Define a serializer/deserializer called "json" which parses JSON messages serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory # Use the "json" serializer for messages in the "PageViewEvent" topic systems.kafka.streams.PageViewEvent.samza.msg.serde=json # Use "ProfileEvent" from the "kafka" system for side inputs for "profile-store" stores.profile-store.side.inputs=kafka.ProfileEvent # Use "MySideInputsProcessorFactory" to instantiate the "SideInputsProcessor" # that will applied on the "ProfileEvent" before writing to "profile-store" stores.profile-store.side.inputs.processor.factory=org.apache.samza.MySideInputsProcessorFactory
{% endhighlight %}