blob: fdc5bc6c0579e07af8470a14c0154447ff239dcb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package samza.examples.cookbook;
import com.couchbase.client.java.document.json.JsonObject;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.context.Context;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.apache.samza.table.remote.NoOpTableReadFunction;
import org.apache.samza.table.remote.RemoteTable;
import org.apache.samza.table.remote.couchbase.CouchbaseTableWriteFunction;
import org.apache.samza.table.retry.TableRetryPolicy;
/**
* This is a simple word count example using a remote store.
*
* In this example, we use Couchbase to demonstrate how to invoke API's on a remote store other than get, put or delete
* as defined in {@link org.apache.samza.table.remote.AsyncRemoteTable}. Input messages are collected from user through
* a Kafka console producer, and tokenized using space. For each word, we increment a counter for this word
* as well as a counter for all words on Couchbase. We also output the current value of both counters to Kafka console
* consumer.
*
* A rate limit of 4 requests/second to Couchbase is set of the entire job, internally Samza uses an embedded
* rate limiter, which evenly distributes the total rate limit among tasks. As we invoke 2 calls on Couchbase
* for each word, you should see roughly 2 messages per second in the Kafka console consumer
* window.
*
* A retry policy with 1 second fixed backoff time and max 3 retries is attached to the remote table.
*
* <p> Concepts covered: remote table, rate limiter, retry, arbitrary operation on remote store.
*
* To run the below example:
*
* <ol>
* <li>
* Create a Couchbase instance using docker; Log into the admin UI at http://localhost:8091 (Administrator/password) <br/>
* create a bucket called "my-bucket" <br/>
* Under Security tab, create a user with the same name, set 123456 as the password, and give it "Data Reader"
* and "Data Writer" privilege for this bucket. <br/>
* More information can be found at https://docs.couchbase.com/server/current/getting-started/do-a-quick-install.html
* </li>
* <li>
* Create Kafka topics "word-input" and "count-output" <br/>
* ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic word-input --partitions 2 --replication-factor 1
* ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic count-output --partitions 2 --replication-factor 1
* </li>
* <li>
* Run the application using the run-app.sh script <br/>
* ./deploy/samza/bin/run-app.sh --config-path=$PWD/deploy/samza/config/couchbase-table-example.properties
* </li>
* <li>
* Consume messages from the output topic <br/>
* ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic count-output
* </li>
* <li>
* Produce some messages to the input topic <br/>
* ./deploy/kafka/bin/kafka-console-producer.sh --topic word-input --broker-list localhost:9092
*
* After the console producer is started, type
* 1
* 2
* 3
* 4
* 5
* 4
* 3
* 2
* 1
*
* You should see messages like below from the console consumer window
*
* 2019-05-23 21:18:07 2019-05-23 21:18:07 word=2, count=1, total-count=1
* 2019-05-23 21:18:07 2019-05-23 21:18:07 word=1, count=1, total-count=2
* 2019-05-23 21:18:07 2019-05-23 21:18:07 word=4, count=1, total-count=3
* 2019-05-23 21:18:07 2019-05-23 21:18:07 word=3, count=1, total-count=4
* 2019-05-23 21:18:08 2019-05-23 21:18:08 word=4, count=2, total-count=5
* 2019-05-23 21:18:08 2019-05-23 21:18:08 word=5, count=1, total-count=6
* 2019-05-23 21:18:09 2019-05-23 21:18:09 word=2, count=2, total-count=7
* 2019-05-23 21:18:09 2019-05-23 21:18:09 word=3, count=2, total-count=8
* 2019-05-23 21:18:10 2019-05-23 21:18:10 word=1, count=2, total-count=9
*
* You can examine the result on Couchbase Admin GUI as well.
*
* Note:
* - If you enter "1 2 3 4 5 4 3 2 1", you should see roughly 1 QPS as
* the input is processed by only one task
*
*
* </li>
* </ol>
*
*/
public class CouchbaseTableExample implements StreamApplication {
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static final String KAFKA_SYSTEM_NAME = "kafka";
private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181");
private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092");
private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1");
private static final String INPUT_STREAM_ID = "word-input";
private static final String OUTPUT_STREAM_ID = "count-output";
private static final String CLUSTER_NODES = "couchbase://127.0.0.1";
private static final int COUCHBASE_PORT = 11210;
private static final String BUCKET_NAME = "my-bucket";
private static final String BUCKET_PASSWORD = "123456";
private static final String TOTAL_COUNT_ID = "total-count";
@Override
public void describe(StreamApplicationDescriptor app) {
KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME)
.withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
.withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
.withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
KafkaInputDescriptor<String> wordInputDescriptor =
kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, new StringSerde());
KafkaOutputDescriptor<String> countOutputDescriptor =
kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, new StringSerde());
MyCouchbaseTableWriteFunction writeFn = new MyCouchbaseTableWriteFunction(BUCKET_NAME, CLUSTER_NODES)
.withBootstrapCarrierDirectPort(COUCHBASE_PORT)
.withUsernameAndPassword(BUCKET_NAME, BUCKET_PASSWORD)
.withTimeout(Duration.ofSeconds(5));
TableRetryPolicy retryPolicy = new TableRetryPolicy()
.withFixedBackoff(Duration.ofSeconds(1))
.withStopAfterAttempts(3);
RemoteTableDescriptor couchbaseTableDescriptor = new RemoteTableDescriptor("couchbase-table")
.withReadFunction(new NoOpTableReadFunction())
.withReadRateLimiterDisabled()
.withWriteFunction(writeFn)
.withWriteRetryPolicy(retryPolicy)
.withWriteRateLimit(4);
app.withDefaultSystem(kafkaSystemDescriptor);
MessageStream<String> wordStream = app.getInputStream(wordInputDescriptor);
OutputStream<String> countStream = app.getOutputStream(countOutputDescriptor);
app.getTable(couchbaseTableDescriptor);
wordStream
.flatMap(m -> Arrays.asList(m.split(" ")))
.filter(word -> word != null && word.length() > 0)
.map(new MyCountFunction())
.map(countString -> currentTime() + " " + countString)
.sendTo(countStream);
}
static class MyCountFunction implements MapFunction<String, String> {
private MyCouchbaseTableWriteFunction writeFn;
@Override
public void init(Context context) {
RemoteTable table = (RemoteTable) context.getTaskContext().getTable("couchbase-table");
writeFn = (MyCouchbaseTableWriteFunction) table.getWriteFunction();
}
@Override
public String apply(String word) {
CompletableFuture<Long> countFuture = writeFn.incCounter(word);
CompletableFuture<Long> totalCountFuture = writeFn.incCounter(TOTAL_COUNT_ID);
return String.format("%s word=%s, count=%d, total-count=%d",
currentTime(), word, countFuture.join(), totalCountFuture.join());
}
}
static class MyCouchbaseTableWriteFunction extends CouchbaseTableWriteFunction<JsonObject> {
private final static int OP_COUNTER = 1;
public MyCouchbaseTableWriteFunction(String bucketName, String... clusterNodes) {
super(bucketName, JsonObject.class, clusterNodes);
}
@Override
public <T> CompletableFuture<T> writeAsync(int opId, Object... args) {
switch (opId) {
case OP_COUNTER:
Preconditions.checkArgument(2 == args.length,
String.format("Two arguments (String and int) are expected for counter operation (opId=%d)", opId));
String id = (String) args[0];
int delta = (int) args[1];
return asyncWriteHelper(
bucket.async().counter(id, delta, 1, timeout.toMillis(), TimeUnit.MILLISECONDS),
String.format("Failed to invoke counter with Id %s from bucket %s.", id, bucketName),
false);
default:
throw new SamzaException("Unknown opId: " + opId);
}
}
public CompletableFuture<Long> incCounter(String id) {
return table.writeAsync(OP_COUNTER, id, 1);
}
}
private static String currentTime() {
return DATE_FORMAT.format(new Date());
}
}