[pulsar-storm] pulsar-bolt: add option to pass producer-configuration (#4495)
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
index 5259b5b..8432b1c 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
@@ -27,6 +27,7 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
@@ -42,6 +43,7 @@
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkNotNull;
public class PulsarBolt extends BaseRichBolt implements IMetric {
/**
@@ -67,18 +69,30 @@
private volatile long messagesSent = 0;
private volatile long messageSizeSent = 0;
+ public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf) {
+ this(pulsarBoltConf, PulsarClient.builder());
+ }
+
public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder clientBuilder) {
- this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone();
- this.producerConf = new ProducerConfigurationData();
+ this(pulsarBoltConf, ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(),
+ new ProducerConfigurationData());
+ }
+
+ public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientConfigurationData clientConf,
+ ProducerConfigurationData producerConf) {
+ checkNotNull(pulsarBoltConf, "bolt configuration can't be null");
+ checkNotNull(clientConf, "client configuration can't be null");
+ checkNotNull(producerConf, "producer configuration can't be null");
Objects.requireNonNull(pulsarBoltConf.getServiceUrl());
Objects.requireNonNull(pulsarBoltConf.getTopic());
Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper());
-
+ this.pulsarBoltConf = pulsarBoltConf;
+ this.clientConf = clientConf;
+ this.producerConf = producerConf;
this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
this.producerConf.setTopicName(pulsarBoltConf.getTopic());
- this.pulsarBoltConf = pulsarBoltConf;
}
-
+
@SuppressWarnings({ "rawtypes" })
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {