As introduced in the What is At Least Once Message Delivery, Gearpump has a built in KafkaSource. To get at least once message delivery, users should deploy a Kafka cluster as the offset store along with the Gearpump cluster.
Here's an example to deploy a local Kafka cluster.
download the latest Kafka from the official website and extract to a local directory ($KAFKA_HOME
)
Boot up the single-node Zookeeper instance packaged with Kafka.
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
Start a Kafka broker
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/kafka.properties
When creating a offset store for KafkaSource
, set the zookeeper connect string to localhost:2181
and broker list to localhost:9092
in KafkaStorageFactory
.
val offsetStorageFactory = new KafkaStorageFactory("localhost:2181", "localhost:9092") val source = new KafkaSource("topic1", "localhost:2181", offsetStorageFactory)
Exactly Once Message Delivery requires both an offset store and a checkpoint store. For the offset store, a Kafka cluster should be deployed as in the previous section. As for the checkpoint store, Gearpump has built-in support for Hadoop file systems, like HDFS. Hence, users should deploy a HDFS cluster alongside the Gearpump cluster.
Here's an example to deploy a local HDFS cluster.
download Hadoop 2.6 from the official website and extracts it to a local directory HADOOP_HOME
add following configuration to $HADOOP_HOME/etc/core-site.xml
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property> </configuration>
start HDFS
$HADOOP_HOME/sbin/start-dfs.sh
When creating a HadoopCheckpointStore
, set the hadoop configuration as in the core-site.xml
val hadoopConfig = new Configuration hadoopConfig.set("fs.defaultFS", "hdfs://localhost:9000") val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", hadoopConfig, // rotate on 1KB new FileSizeRotation(1000))