When writing a stream processor for Samza, you must implement the StreamTask interface:
{% highlight java %} package com.example.samza;
public class MyTaskClass implements StreamTask {
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { // process message } } {% endhighlight %}
When you run your job, Samza will create several instances of your class (potentially on multiple machines). These task instances process the messages in the input streams.
In your job's configuration you can tell Samza which streams you want to consume. An incomplete example could look like this (see the configuration documentation for more detail):
{% highlight jproperties %}
task.class=com.example.samza.MyTaskClass
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
task.inputs=kafka.PageViewEvent
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
systems.kafka.streams.PageViewEvent.samza.msg.serde=json {% endhighlight %}
For each message that Samza receives from the task's input streams, the process method is called. The envelope contains three things of importance: the message, the key, and the stream that the message came from.
{% highlight java %} /** Every message that is delivered to a StreamTask is wrapped
/** A deserialized key. */ Object getKey() { ... }
/** The stream and partition that this message came from. */ SystemStreamPartition getSystemStreamPartition() { ... } } {% endhighlight %}
The key and value are declared as Object, and need to be cast to the correct type. If you don't configure a serializer/deserializer, they are typically Java byte arrays. A deserializer can convert these bytes into any other type, for example the JSON deserializer mentioned above parses the byte array into java.util.Map, java.util.List and String objects.
The getSystemStreamPartition()
method returns a SystemStreamPartition object, which tells you where the message came from. It consists of three parts:
The API looks like this:
{% highlight java %} /** A triple of system name, stream name and partition. */ public class SystemStreamPartition extends SystemStream {
/** The name of the system which provides this stream. It is defined in the Samza job's configuration. */ public String getSystem() { ... }
/** The name of the stream/topic/queue within the system. */ public String getStream() { ... }
/** The partition within the stream. */ public Partition getPartition() { ... } } {% endhighlight %}
In the example job configuration above, the system name is “kafka”, the stream name is “PageViewEvent”. (The name “kafka” isn‘t special — you can give your system any name you want.) If you have several input streams feeding into your StreamTask, you can use the SystemStreamPartition to determine what kind of message you’ve received.
What about sending messages? If you take a look at the process() method in StreamTask, you'll see that you get a MessageCollector.
{% highlight java %} /** When a task wishes to send a message, it uses this interface. */ public interface MessageCollector { void send(OutgoingMessageEnvelope envelope); } {% endhighlight %}
To send a message, you create an OutgoingMessageEnvelope object and pass it to the message collector. At a minimum, the envelope specifies the message you want to send, and the system and stream name to send it to. Optionally you can specify the partitioning key and other parameters. See the javadoc for details.
NOTE: Please only use the MessageCollector object within the process()
method. If you hold on to a MessageCollector instance and use it again later, your messages may not be sent correctly.
For example, here's a simple task that splits each input message into words, and emits each word as a separate message:
{% highlight java %} public class SplitStringIntoWords implements StreamTask {
// Send outgoing messages to a stream called “words” // in the “kafka” system. private final SystemStream OUTPUT_STREAM = new SystemStream(“kafka”, “words”);
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { String message = (String) envelope.getMessage();
for (String word : message.split(" ")) { // Use the word as the key, and 1 as the value. // A second task can add the 1's to get the word count. collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1)); }
} } {% endhighlight %}