Update Event Hubs Example

Update the Event Hubs Connector example for 1.0 release.

Author: Daniel Chen <dchen1@linkedin.com>

Reviewers: Prateek Maheshwari

Closes #38 from dxichen/latest
diff --git a/bin/run-azure-application.sh b/bin/run-event-hubs-zk-application.sh
similarity index 100%
rename from bin/run-azure-application.sh
rename to bin/run-event-hubs-zk-application.sh
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index 8f3694e..1614aaf 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -49,7 +49,7 @@
       <outputDirectory>bin</outputDirectory>
     </file>
     <file>
-      <source>${basedir}/bin/run-azure-application.sh</source>
+      <source>${basedir}/bin/run-event-hubs-zk-application.sh</source>
       <outputDirectory>bin</outputDirectory>
     </file>
   </files>
diff --git a/src/main/config/azure-application-local-runner.properties b/src/main/config/azure-application-local-runner.properties
index e440fd8..3b7618e 100644
--- a/src/main/config/azure-application-local-runner.properties
+++ b/src/main/config/azure-application-local-runner.properties
@@ -21,29 +21,9 @@
 job.default.system=eventhubs
 job.coordinator.zk.connect=localhost:2181
 
-# Azure EventHubs System
-systems.eventhubs.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory
-systems.eventhubs.stream.list=output-stream,input-stream
-
-# Add your EventHubs input stream credentials here
-systems.eventhubs.streams.input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
-systems.eventhubs.streams.input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
-systems.eventhubs.streams.input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
-systems.eventhubs.streams.input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
-
-# Add your EventHubs output stream credentials here
-systems.eventhubs.streams.output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
-systems.eventhubs.streams.output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
-systems.eventhubs.streams.output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
-systems.eventhubs.streams.output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
-
-# Azure Table Checkpoint Manager
-task.checkpoint.factory=org.apache.samza.checkpoint.azure.AzureCheckpointManagerFactory
-azure.storage.connect=YOUR-STORAGE-ACCOUNT-CONNECTION-STRING
+# Define the key and name configurations with property names of your choice, starting with 'sensitive.'
+sensitive.eventhubs.sas.key.name=my-sas-key-name
+sensitive.eventhubs.sas.token=my-sas-token
 
 # Task/Application
 task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
-
-# Streams
-streams.input-stream.samza.system=eventhubs
-streams.output-stream.samza.system=eventhubs
diff --git a/src/main/java/samza/examples/azure/AzureApplication.java b/src/main/java/samza/examples/azure/AzureApplication.java
index e2c337f..454787f 100644
--- a/src/main/java/samza/examples/azure/AzureApplication.java
+++ b/src/main/java/samza/examples/azure/AzureApplication.java
@@ -24,39 +24,60 @@
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.serializers.ByteSerde;
-import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
-import org.apache.samza.system.descriptors.GenericOutputDescriptor;
-import org.apache.samza.system.descriptors.GenericSystemDescriptor;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.eventhub.descriptors.EventHubsInputDescriptor;
+import org.apache.samza.system.eventhub.descriptors.EventHubsOutputDescriptor;
+import org.apache.samza.system.eventhub.descriptors.EventHubsSystemDescriptor;
+
 
 public class AzureApplication implements StreamApplication {
+  // Stream names
   private static final String INPUT_STREAM_ID = "input-stream";
   private static final String OUTPUT_STREAM_ID = "output-stream";
 
+  // These properties could be configured here or in azure-application-local-runner.properties
+  // Keep in mind that the .properties file will be overwrite properties defined here with Descriptors
+  private static final String EVENTHUBS_NAMESPACE = "my-eventhubs-namespace";
+
+  // Upstream and downstream Event Hubs entity names
+  private static final String EVENTHUBS_INPUT_ENTITY = "my-input-entity";
+  private static final String EVENTHUBS_OUTPUT_ENTITY = "my-output-entity";
+
+  // You may define your own config properties in azure-application-local-runner.properties and retrieve them
+  // in the StreamApplicationDescriptor. Prefix them with 'sensitive.' to avoid logging them.
+  private static final String EVENTHUBS_SAS_KEY_NAME_CONFIG = "sensitive.eventhubs.sas.key.name";
+  private static final String EVENTHUBS_SAS_KEY_TOKEN_CONFIG = "sensitive.eventhubs.sas.token";
+
   @Override
   public void describe(StreamApplicationDescriptor appDescriptor) {
-    GenericSystemDescriptor systemDescriptor =
-        new GenericSystemDescriptor("eventhubs", "org.apache.samza.system.eventhub.EventHubSystemFactory");
+    // Define your system here
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs");
 
-    KVSerde<String, byte[]> serde = KVSerde.of(new StringSerde(), new ByteSerde());
+    // Choose your serializer/deserializer for the EventData payload
+    StringSerde serde = new StringSerde();
 
-    GenericInputDescriptor<KV<String, byte[]>> inputDescriptor =
-        systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, serde);
+    // Define the input and output descriptors with respective configs
+    EventHubsInputDescriptor<KV<String, String>> inputDescriptor =
+        systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_INPUT_ENTITY, serde)
+            .withSasKeyName(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_NAME_CONFIG))
+            .withSasKey(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_TOKEN_CONFIG));
 
-    GenericOutputDescriptor<KV<String, byte[]>> outputDescriptor =
-        systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, serde);
+    EventHubsOutputDescriptor<KV<String, String>> outputDescriptor =
+        systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_OUTPUT_ENTITY, serde)
+            .withSasKeyName(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_NAME_CONFIG))
+            .withSasKey(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_TOKEN_CONFIG));
 
-    MessageStream<KV<String, byte[]>> eventhubInput = appDescriptor.getInputStream(inputDescriptor);
-    OutputStream<KV<String, byte[]>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor);
+    // Define the input and output streams with descriptors
+    MessageStream<KV<String, String>> eventhubInput = appDescriptor.getInputStream(inputDescriptor);
+    OutputStream<KV<String, String>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor);
 
+    // Define the execution flow with the high-level API
     eventhubInput
-        .filter((message) -> message.getKey() != null)
         .map((message) -> {
           System.out.println("Sending: ");
           System.out.println("Received Key: " + message.getKey());
-          System.out.println("Received Message: " + new String(message.getValue()));
+          System.out.println("Received Message: " + message.getValue());
           return message;
         })
         .sendTo(eventhubOutput);