Pulsar StormはApache Storm Topologyと連携するためのアダプタであり、データの送受信を行うためのStromの実装を提供します。
アプリケーションはPulsar Spoutを通してStorm Topologyにデータを注入したり、Pulsar Boltを通してStorm Topologyからデータを取り出すことができます。
Pulsar Stormアダプタの依存をincludeします:
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-storm</artifactId> <version>${pulsar.version}</version> </dependency>
Pulsar Spoutを使って、Storm TopologyはPulsarのトピックに発行されたデータをconsumeできます。Pulsar Spoutは受信したメッセージとクライアントが提供するMessageToValuesMapperをもとにStorm Tupleを発行します。
下流のBoltでの処理に失敗したTupleはSpoutから再注入されます。この再注入は指数バックオフアルゴリズムに従い、設定可能なタイムアウト (デフォルト: 60秒) または設定可能なリトライ回数のどちらかに達するまで行われます。その後、Consumerにより処理成功の応答が返されます。
// Pulsarクライアントの設定 ClientConfiguration clientConf = new ClientConfiguration(); // Pulsar Consumerの設定 ConsumerConfiguration consumerConf = new ConsumerConfiguration(); @SuppressWarnings("serial") MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() { @Override public Values toValues(Message msg) { return new Values(new String(msg.getData())); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 出力フィールドを宣言 declarer.declare(new Fields("string")); } }; // Pulsar Spoutの設定 PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration(); spoutConf.setServiceUrl("http://broker.messaging.usw.example.com:8080"); spoutConf.setTopic("persistent://my-property/usw/my-ns/my-topic1"); spoutConf.setSubscriptionName("my-subscriber-name1"); spoutConf.setMessageToValuesMapper(messageToValuesMapper); // Pulsar Spoutの作成 PulsarSpout spout = new PulsarSpout(spoutConf, clientConf, consumerConf);
Pulsar Boltを使って、Storm Topology内のデータをPulsarのトピックに発行できます。Pulsar Boltは受信したStorm Tupleとクライアントが提供するTupleToMessageMapperをもとにメッセージを発行します。
異なるトピックのメッセージを発行するためにパーティションドトピックを利用できます。その場合TupleToMessageMapperの実装において、メッセージに「キー」を用意する必要があります。同じキーを持つメッセージは同じトピックに送信されるようになります。
// Pulsarクライアントの設定 ClientConfiguration clientConf = new ClientConfiguration(); // Pulsar Producerの設定 ProducerConfiguration producerConf = new ProducerConfiguration(); @SuppressWarnings("serial") TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() { @Override public Message toMessage(Tuple tuple) { String receivedMessage = tuple.getString(0); // メッセージを処理 String processedMsg = receivedMessage + "-processed"; return MessageBuilder.create().setContent(processedMsg.getBytes()).build(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 出力フィールドを宣言 } }; // Pulsar Boltの設定 PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration(); boltConf.setServiceUrl("http://broker.messaging.usw.example.com:8080"); boltConf.setTopic("persistent://my-property/usw/my-ns/my-topic2"); boltConf.setTupleToMessageMapper(tupleToMessageMapper); // Pulsar Boltの作成 PulsarBolt bolt = new PulsarBolt(boltConf, clientConf);
完全な実装の例はStormExample.javaを参照してください。