Update Event Hubs Example
Update the Event Hubs Connector example for 1.0 release.
Author: Daniel Chen <dchen1@linkedin.com>
Reviewers: Prateek Maheshwari
Closes #38 from dxichen/latest
diff --git a/bin/run-azure-application.sh b/bin/run-event-hubs-zk-application.sh
similarity index 100%
rename from bin/run-azure-application.sh
rename to bin/run-event-hubs-zk-application.sh
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index 8f3694e..1614aaf 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -49,7 +49,7 @@
<outputDirectory>bin</outputDirectory>
</file>
<file>
- <source>${basedir}/bin/run-azure-application.sh</source>
+ <source>${basedir}/bin/run-event-hubs-zk-application.sh</source>
<outputDirectory>bin</outputDirectory>
</file>
</files>
diff --git a/src/main/config/azure-application-local-runner.properties b/src/main/config/azure-application-local-runner.properties
index e440fd8..3b7618e 100644
--- a/src/main/config/azure-application-local-runner.properties
+++ b/src/main/config/azure-application-local-runner.properties
@@ -21,29 +21,9 @@
job.default.system=eventhubs
job.coordinator.zk.connect=localhost:2181
-# Azure EventHubs System
-systems.eventhubs.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory
-systems.eventhubs.stream.list=output-stream,input-stream
-
-# Add your EventHubs input stream credentials here
-systems.eventhubs.streams.input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
-systems.eventhubs.streams.input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
-systems.eventhubs.streams.input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
-systems.eventhubs.streams.input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
-
-# Add your EventHubs output stream credentials here
-systems.eventhubs.streams.output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
-systems.eventhubs.streams.output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
-systems.eventhubs.streams.output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
-systems.eventhubs.streams.output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
-
-# Azure Table Checkpoint Manager
-task.checkpoint.factory=org.apache.samza.checkpoint.azure.AzureCheckpointManagerFactory
-azure.storage.connect=YOUR-STORAGE-ACCOUNT-CONNECTION-STRING
+# Define the key and name configurations with property names of your choice, starting with 'sensitive.'
+sensitive.eventhubs.sas.key.name=my-sas-key-name
+sensitive.eventhubs.sas.token=my-sas-token
# Task/Application
task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
-
-# Streams
-streams.input-stream.samza.system=eventhubs
-streams.output-stream.samza.system=eventhubs
diff --git a/src/main/java/samza/examples/azure/AzureApplication.java b/src/main/java/samza/examples/azure/AzureApplication.java
index e2c337f..454787f 100644
--- a/src/main/java/samza/examples/azure/AzureApplication.java
+++ b/src/main/java/samza/examples/azure/AzureApplication.java
@@ -24,39 +24,60 @@
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
-import org.apache.samza.serializers.ByteSerde;
-import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
-import org.apache.samza.system.descriptors.GenericOutputDescriptor;
-import org.apache.samza.system.descriptors.GenericSystemDescriptor;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.eventhub.descriptors.EventHubsInputDescriptor;
+import org.apache.samza.system.eventhub.descriptors.EventHubsOutputDescriptor;
+import org.apache.samza.system.eventhub.descriptors.EventHubsSystemDescriptor;
+
public class AzureApplication implements StreamApplication {
+ // Stream names
private static final String INPUT_STREAM_ID = "input-stream";
private static final String OUTPUT_STREAM_ID = "output-stream";
+ // These properties could be configured here or in azure-application-local-runner.properties
+ // Keep in mind that the .properties file will be overwrite properties defined here with Descriptors
+ private static final String EVENTHUBS_NAMESPACE = "my-eventhubs-namespace";
+
+ // Upstream and downstream Event Hubs entity names
+ private static final String EVENTHUBS_INPUT_ENTITY = "my-input-entity";
+ private static final String EVENTHUBS_OUTPUT_ENTITY = "my-output-entity";
+
+ // You may define your own config properties in azure-application-local-runner.properties and retrieve them
+ // in the StreamApplicationDescriptor. Prefix them with 'sensitive.' to avoid logging them.
+ private static final String EVENTHUBS_SAS_KEY_NAME_CONFIG = "sensitive.eventhubs.sas.key.name";
+ private static final String EVENTHUBS_SAS_KEY_TOKEN_CONFIG = "sensitive.eventhubs.sas.token";
+
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
- GenericSystemDescriptor systemDescriptor =
- new GenericSystemDescriptor("eventhubs", "org.apache.samza.system.eventhub.EventHubSystemFactory");
+ // Define your system here
+ EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs");
- KVSerde<String, byte[]> serde = KVSerde.of(new StringSerde(), new ByteSerde());
+ // Choose your serializer/deserializer for the EventData payload
+ StringSerde serde = new StringSerde();
- GenericInputDescriptor<KV<String, byte[]>> inputDescriptor =
- systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, serde);
+ // Define the input and output descriptors with respective configs
+ EventHubsInputDescriptor<KV<String, String>> inputDescriptor =
+ systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_INPUT_ENTITY, serde)
+ .withSasKeyName(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_NAME_CONFIG))
+ .withSasKey(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_TOKEN_CONFIG));
- GenericOutputDescriptor<KV<String, byte[]>> outputDescriptor =
- systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, serde);
+ EventHubsOutputDescriptor<KV<String, String>> outputDescriptor =
+ systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_OUTPUT_ENTITY, serde)
+ .withSasKeyName(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_NAME_CONFIG))
+ .withSasKey(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_TOKEN_CONFIG));
- MessageStream<KV<String, byte[]>> eventhubInput = appDescriptor.getInputStream(inputDescriptor);
- OutputStream<KV<String, byte[]>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor);
+ // Define the input and output streams with descriptors
+ MessageStream<KV<String, String>> eventhubInput = appDescriptor.getInputStream(inputDescriptor);
+ OutputStream<KV<String, String>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor);
+ // Define the execution flow with the high-level API
eventhubInput
- .filter((message) -> message.getKey() != null)
.map((message) -> {
System.out.println("Sending: ");
System.out.println("Received Key: " + message.getKey());
- System.out.println("Received Message: " + new String(message.getValue()));
+ System.out.println("Received Message: " + message.getValue());
return message;
})
.sendTo(eventhubOutput);