SAMZA-2218: Add a Couchbase example to samza-hello-samza (#64)

SAMZA-2218: Add a Couchbase example
diff --git a/pom.xml b/pom.xml
index c161027..a2064cf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -288,6 +288,7 @@
               <exclude>.git/**</exclude>
               <exclude>*.md</exclude>
               <exclude>docs/**</exclude>
+              <exclude>out/**</exclude>
               <exclude>config/**</exclude>
               <exclude>bin/**</exclude>
               <exclude>.gitignore</exclude>
diff --git a/src/main/config/couchbase-table-example.properties b/src/main/config/couchbase-table-example.properties
new file mode 100644
index 0000000..0bbfdea
--- /dev/null
+++ b/src/main/config/couchbase-table-example.properties
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Application / Job
+app.class=samza.examples.cookbook.CouchbaseTableExample
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=couchbase-table-example
+job.container.count=2
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
diff --git a/src/main/java/samza/examples/cookbook/CouchbaseTableExample.java b/src/main/java/samza/examples/cookbook/CouchbaseTableExample.java
new file mode 100644
index 0000000..4923110
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/CouchbaseTableExample.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import com.couchbase.client.java.document.json.JsonObject;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
+import org.apache.samza.table.remote.DummyTableReadFunction;
+import org.apache.samza.table.remote.RemoteTable;
+import org.apache.samza.table.remote.couchbase.CouchbaseTableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
+
+
+/**
+ * This is a simple word count example using a remote store.
+ *
+ * In this example, we use Couchbase to demonstrate how to invoke API's on a remote store other than get, put or delete
+ * as defined in {@link org.apache.samza.table.remote.AsyncRemoteTable}. Input messages are collected from user through
+ * a Kafka console producer, and tokenized using space. For each word, we increment a counter for this word
+ * as well as a counter for all words on Couchbase. We also output the current value of both counters to Kafka console
+ * consumer.
+ *
+ * A rate limit of 4 requests/second to Couchbase is set of the entire job, internally Samza uses an embedded
+ * rate limiter, which evenly distributes the total rate limit among tasks. As we invoke 2 calls on Couchbase
+ * for each word, you should see roughly 2 messages per second in the Kafka console consumer
+ * window.
+ *
+ * A retry policy with 1 second fixed backoff time and max 3 retries is attached to the remote table.
+ *
+ * <p> Concepts covered: remote table, rate limiter, retry, arbitrary operation on remote store.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ *   <li>
+ *     Create a Couchbase instance using docker; Log into the admin UI at http://localhost:8091 (Administrator/password) <br/>
+ *     create a bucket called "my-bucket" <br/>
+ *     Under Security tab, create a user with the same name, set 123456 as the password, and give it "Data Reader"
+ *     and "Data Writer" privilege for this bucket. <br/>
+ *     More information can be found at https://docs.couchbase.com/server/current/getting-started/do-a-quick-install.html
+ *   </li>
+ *   <li>
+ *     Create Kafka topics "word-input" and "count-output" <br/>
+ *     ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic word-input --partitions 2 --replication-factor 1
+ *     ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic count-output --partitions 2 --replication-factor 1
+ *   </li>
+ *   <li>
+ *     Run the application using the run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/couchbase-table-example.properties
+ *   </li>
+ *   <li>
+ *     Consume messages from the output topic <br/>
+ *     ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic count-output
+ *   </li>
+ *   <li>
+ *     Produce some messages to the input topic <br/>
+ *     ./deploy/kafka/bin/kafka-console-producer.sh --topic word-input --broker-list localhost:9092
+ *
+ *     After the console producer is started, type
+ *     1
+ *     2
+ *     3
+ *     4
+ *     5
+ *     4
+ *     3
+ *     2
+ *     1
+ *
+ *     You should see messages like below from the console consumer window
+ *
+ *     2019-05-23 21:18:07 2019-05-23 21:18:07 word=2, count=1, total-count=1
+ *     2019-05-23 21:18:07 2019-05-23 21:18:07 word=1, count=1, total-count=2
+ *     2019-05-23 21:18:07 2019-05-23 21:18:07 word=4, count=1, total-count=3
+ *     2019-05-23 21:18:07 2019-05-23 21:18:07 word=3, count=1, total-count=4
+ *     2019-05-23 21:18:08 2019-05-23 21:18:08 word=4, count=2, total-count=5
+ *     2019-05-23 21:18:08 2019-05-23 21:18:08 word=5, count=1, total-count=6
+ *     2019-05-23 21:18:09 2019-05-23 21:18:09 word=2, count=2, total-count=7
+ *     2019-05-23 21:18:09 2019-05-23 21:18:09 word=3, count=2, total-count=8
+ *     2019-05-23 21:18:10 2019-05-23 21:18:10 word=1, count=2, total-count=9
+ *
+ *     You can examine the result on Couchbase Admin GUI as well.
+ *
+ *     Note:
+ *       - If you enter "1 2 3 4 5 4 3 2 1", you should see roughly 1 QPS as
+ *         the input is processed by only one task
+ *
+ *
+ *   </li>
+ * </ol>
+ *
+ */
+public class CouchbaseTableExample implements StreamApplication {
+
+  private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+  private static final String KAFKA_SYSTEM_NAME = "kafka";
+  private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181");
+  private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092");
+  private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1");
+
+  private static final String INPUT_STREAM_ID = "word-input";
+  private static final String OUTPUT_STREAM_ID = "count-output";
+
+  private static final String CLUSTER_NODES = "couchbase://127.0.0.1";
+  private static final int COUCHBASE_PORT = 11210;
+  private static final String BUCKET_NAME = "my-bucket";
+  private static final String BUCKET_PASSWORD = "123456";
+  private static final String TOTAL_COUNT_ID = "total-count";
+
+  @Override
+  public void describe(StreamApplicationDescriptor app) {
+
+    KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME)
+        .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
+        .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
+        .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
+
+    KafkaInputDescriptor<String> wordInputDescriptor =
+        kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, new StringSerde());
+
+    KafkaOutputDescriptor<String> countOutputDescriptor =
+        kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, new StringSerde());
+
+    MyCouchbaseTableWriteFunction writeFn = new MyCouchbaseTableWriteFunction(BUCKET_NAME, CLUSTER_NODES)
+        .withBootstrapCarrierDirectPort(COUCHBASE_PORT)
+        .withUsernameAndPassword(BUCKET_NAME, BUCKET_PASSWORD)
+        .withTimeout(Duration.ofSeconds(5));
+
+    TableRetryPolicy retryPolicy = new TableRetryPolicy()
+        .withFixedBackoff(Duration.ofSeconds(1))
+        .withStopAfterAttempts(3);
+
+    RemoteTableDescriptor couchbaseTableDescriptor = new RemoteTableDescriptor("couchbase-table")
+        .withReadFunction(new DummyTableReadFunction())
+        .withReadRateLimiterDisabled()
+        .withWriteFunction(writeFn)
+        .withWriteRetryPolicy(retryPolicy)
+        .withWriteRateLimit(4);
+
+    app.withDefaultSystem(kafkaSystemDescriptor);
+    MessageStream<String> wordStream = app.getInputStream(wordInputDescriptor);
+    OutputStream<String> countStream = app.getOutputStream(countOutputDescriptor);
+    app.getTable(couchbaseTableDescriptor);
+
+    wordStream
+        .flatMap(m -> Arrays.asList(m.split(" ")))
+        .filter(word -> word != null && word.length() > 0)
+        .map(new MyCountFunction())
+        .map(countString -> currentTime() + " " + countString)
+        .sendTo(countStream);
+  }
+
+  static class MyCountFunction implements MapFunction<String, String> {
+
+    private MyCouchbaseTableWriteFunction writeFn;
+
+    @Override
+    public void init(Context context) {
+      RemoteTable table = (RemoteTable) context.getTaskContext().getTable("couchbase-table");
+      writeFn = (MyCouchbaseTableWriteFunction) table.getWriteFunction();
+    }
+
+    @Override
+    public String apply(String word) {
+      CompletableFuture<Long> countFuture = writeFn.incCounter(word);
+      CompletableFuture<Long> totalCountFuture = writeFn.incCounter(TOTAL_COUNT_ID);
+      return String.format("%s word=%s, count=%d, total-count=%d",
+          currentTime(), word, countFuture.join(), totalCountFuture.join());
+    }
+  }
+
+  static class MyCouchbaseTableWriteFunction extends CouchbaseTableWriteFunction<JsonObject> {
+
+    private final static int OP_COUNTER = 1;
+
+    public MyCouchbaseTableWriteFunction(String bucketName, String... clusterNodes) {
+      super(bucketName, JsonObject.class, clusterNodes);
+    }
+
+    @Override
+    public <T> CompletableFuture<T> writeAsync(int opId, Object... args) {
+      switch (opId) {
+        case OP_COUNTER:
+          Preconditions.checkArgument(2 == args.length,
+              String.format("Two arguments (String and int) are expected for counter operation (opId=%d)", opId));
+          String id = (String) args[0];
+          int delta = (int) args[1];
+          return asyncWriteHelper(
+              bucket.async().counter(id, delta, 1, timeout.toMillis(), TimeUnit.MILLISECONDS),
+              String.format("Failed to invoke counter with Id %s from bucket %s.", id, bucketName),
+              false);
+        default:
+          throw new SamzaException("Unknown opId: " + opId);
+      }
+    }
+
+    public CompletableFuture<Long> incCounter(String id) {
+      return table.writeAsync(OP_COUNTER, id, 1);
+    }
+
+  }
+
+  private static String currentTime() {
+    return DATE_FORMAT.format(new Date());
+  }
+
+}