You can configure your Samza jobs to process data from Azure Eventhubs, Microsoft's data streaming service. An event hub
is similar to a Kafka topic and can have multiple partitions with producers and consumers. Each message produced or consumed from an event hub is an instance of EventData.
Samza's EventHubSystemConsumer wraps the EventData into an EventHubIncomingMessageEnvelope. The key of the message is set to the partition key of the EventData. The message is obtained from the EventData body.
To configure Samza to configure from EventHub streams:
# define an event hub system factory with your identifier. eg: eh-system systems.eh-system.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory # define your streams systems.eh-system.stream.list=input0, output0 # define required properties for your streams systems.eh-system.streams.input0.eventhubs.namespace=YOUR-STREAM-NAMESPACE systems.eh-system.streams.input0.eventhubs.entitypath=YOUR-ENTITY-NAME systems.eh-system.streams.input0.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME systems.eh-system.streams.input0.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN systems.eh-system.streams.output0.eventhubs.namespace=YOUR-STREAM-NAMESPACE systems.eh-system.streams.output0.eventhubs.entitypath=YOUR-ENTITY-NAME systems.eh-system.streams.output0.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME systems.eh-system.streams.output0.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
The tuple required to access the Eventhubs entity per stream must be provided, namely the fields YOUR-STREAM-NAMESPACE
, YOUR-ENTITY-NAME
, YOUR-SAS-KEY-NAME
, YOUR-SAS-KEY-TOKEN
.
Similarly, you can also configure your Samza job to write to EventHubs.
OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("eh-system", "output0"), key, message); collector.send(envelope);
Each OutgoingMessageEnvelope is converted into an EventData instance whose body is set to the message
in the envelope. Additionally, the key
and the produce timestamp
are set as properties in the EventData before sending it to EventHubs.
The partition.method
property determines how outgoing messages are partitioned. Valid values for this config are EVENT_HUB_HASHING
, PARTITION_KEY_AS_PARTITION
or ROUND_ROBIN
.
EVENT_HUB_HASHING
: By default, Samza computes the partition for an outgoing message based on the hash of its partition-key. This ensures that events with the same key are sent to the same partition. If this option is chosen, the partition key should be a string. If the partition key is not set, the key in the message is used for partitioning.
PARTITION_KEY_AS_PARTITION
: In this method, each message is sent to the partition specified by its partition key. This requires the partition key to be an integer. If the key is greater than the number of partitions, a modulo operation will be performed on the key. Similar to EVENT_HUB_HASHING, the key in the message is used if the partition key is not specified.
ROUND_ROBIN
: In this method, outgoing messages are distributed in a round-robin across all partitions. The key and the partition key in the message are ignored.
systems.eh-system.partition.method = EVENT_HUB_HASHING
Eventhub supports a notion of consumer groups which enable multiple applications have their own view of the event stream. Each event hub stream has a pre-defined consumer group named $Default
. You can define your own consumer group for your job and configure a eventhubs.consumer.group
systems.eh-system.streams.eh-input0.eventhubs.consumer.group = my-group
By default, the messages from EventHubs are sent and received as byte arrays. You can configure a serializer and deserializer for your message by setting a value for msg.serde
for your stream.
streams.input0.samza.msg.serde = json streams.output0.samza.msg.serde = json
When the consumer reads a message from event hubs, it appends them to a shared producer-consumer buffer corresponding to its partition. This config determines the per-partition queue size. Setting a higher value for this config typically achieves a higher throughput at the expense of increased on-heap memory.
systems.eh-system.eventhubs.receive.queue.size = 10
For the list of all configs, check out the configuration table page here
The hello-samza project contains an example of a high level job that consumes and produces to Eventhub using the Zookeeper deployment model.
Let's get started by cloning the hello-samza project
git clone https://git.apache.org/samza-hello-samza.git hello-samza cd hello-samza git checkout latest
The project comes up with numerous examples and for this tutorial, we will pick the Azure application.
For our Azure application, we require ZooKeeper. The hello-samza project comes with a script called “grid” to help with the environment setup
./bin/grid standalone
This command will download, install, and start ZooKeeper and Kafka. It will also check out the latest version of Samza and build it. All package files will be put in a sub-directory called “deploy” inside hello-samza's root folder.
If you get a complaint that JAVA_HOME is not set, then you'll need to set it to the path where Java is installed on your system.
Here are the configs you must set before building the project. Configure these in the src/main/config/azure-application-local-runner.properties
file.
# Add your EventHubs input stream credentials here systems.eventhubs.streams.input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE systems.eventhubs.streams.input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME systems.eventhubs.streams.input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME systems.eventhubs.streams.input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN # Add your EventHubs output stream credentials here systems.eventhubs.streams.output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE systems.eventhubs.streams.output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME systems.eventhubs.streams.output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME systems.eventhubs.streams.output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
Optionally, you may also use the Azure Checkpoint Manager. Otherwise, comment out both these lines.
# Azure Table Checkpoint Manager task.checkpoint.factory=org.apache.samza.checkpoint.azure.AzureCheckpointManagerFactory azure.storage.connect=YOUR-STORAGE-ACCOUNT-CONNECTION-STRING
With the environment setup complete, let us move on to building the hello-samza project. Execute the following commands:
mvn clean package mkdir -p deploy/samza tar -xvf ./target/hello-samza-0.14.0-SNAPSHOT-dist.tar.gz -C deploy/samza
We are now all set to deploy the application locally.
In order to run the application, we will use the run-azure-application script.
./deploy/samza/bin/run-azure-application.sh
The above command executes the helper script which invokes the AzureZKLocalApplication main class, which starts the AzureApplication. This application filters out the messages consumed without keys, prints them out and send them the configured output stream.
The messages consumed should be printed in the following format:
Sending: Received Key: <KEY> Received Message: <VALUE>
This application can be shutdown by terminating the run-azure-application script. We can use the grid script to tear down the local environment (Kafka and Zookeeper).
bin/grid stop all