[IO] updated aerospike-java-client version and fixed NPE (#6263)
* updated aerospike-java-client version
fixed the NPE for the Eventloop
* updated java client version
diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
index fe3787a..7374f8a 100644
--- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
+++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
@@ -71,14 +71,13 @@
writePolicy = new WritePolicy();
writePolicy.maxRetries = aerospikeSinkConfig.getRetries();
writePolicy.setTimeout(aerospikeSinkConfig.getTimeoutMs());
- createClient();
+ eventLoops = new NioEventLoops(new EventPolicy(), 1);
+ eventLoop = eventLoops.next();
+ createClient(eventLoops);
queue = new LinkedBlockingDeque<>(aerospikeSinkConfig.getMaxConcurrentRequests());
for (int i = 0; i < aerospikeSinkConfig.getMaxConcurrentRequests(); ++i) {
queue.put(new AWriteListener(queue));
}
-
- eventLoops = new NioEventLoops(new EventPolicy(), 1);
- eventLoop = eventLoops.next();
}
@Override
@@ -109,7 +108,7 @@
client.put(eventLoop, listener, writePolicy, key, bin);
}
- private void createClient() {
+ private void createClient(NioEventLoops eventLoops) {
String[] hosts = aerospikeSinkConfig.getSeedHosts().split(",");
if (hosts.length <= 0) {
throw new RuntimeException("Invalid Seed Hosts");
@@ -125,6 +124,7 @@
policy.user = aerospikeSinkConfig.getUserName();
policy.password = aerospikeSinkConfig.getPassword();
}
+ policy.eventLoops = eventLoops;
client = new AerospikeClient(policy, aeroSpikeHosts);
}