Upgrade to Mockito 2.x (#4671)
Upgrading to Mockito 2.28 and PowerMock 2.0. This a pre-step to be able to run CI with Java 11 / 12
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
index a21633d..a4582bc 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.io.file;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
index 855498e..5be101f 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.io.file;
import static org.testng.Assert.fail;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
index 98d35a9..9970a55 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.io.file;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
diff --git a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
index fe8941e..a746f7e 100644
--- a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
+++ b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
@@ -19,8 +19,6 @@
package org.apache.pulsar.io.influxdb;
import com.google.common.collect.Maps;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@@ -46,7 +44,7 @@
import java.util.Map;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -151,7 +149,7 @@
verify(this.influxDB, times(1)).createDatabase("testDB");
doAnswer(invocationOnMock -> {
- BatchPoints batchPoints = invocationOnMock.getArgumentAt(0, BatchPoints.class);
+ BatchPoints batchPoints = invocationOnMock.getArgument(0, BatchPoints.class);
Assert.assertNotNull(batchPoints, "batchPoints should not be null.");
return null;
}).when(influxDB).write(any(BatchPoints.class));
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
index 3eb6f6e..7dc665d 100644
--- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
@@ -18,21 +18,14 @@
*/
package org.apache.pulsar.io.mongodb;
+import static java.util.stream.Collectors.toList;
+
import com.google.common.collect.Lists;
import com.mongodb.MongoBulkWriteException;
import com.mongodb.async.client.MongoClient;
import com.mongodb.async.client.MongoClients;
import com.mongodb.async.client.MongoCollection;
import com.mongodb.async.client.MongoDatabase;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.core.annotations.Connector;
-import org.apache.pulsar.io.core.annotations.IOType;
-import org.bson.BSONException;
-import org.bson.Document;
-import org.bson.json.JsonParseException;
import java.nio.charset.Charset;
import java.util.ArrayList;
@@ -42,9 +35,19 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import java.util.stream.IntStream;
-import static java.util.stream.Collectors.toList;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.bson.BSONException;
+import org.bson.Document;
+import org.bson.json.JsonParseException;
/**
* The base class for MongoDB sinks.
@@ -70,6 +73,15 @@
private ScheduledExecutorService flushExecutor;
+ private Supplier<MongoClient> clientProvider;
+
+ public MongoSink() {
+ this(null);
+ }
+
+ public MongoSink(Supplier<MongoClient> clientProvider) {
+ this.clientProvider = clientProvider;
+ }
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
@@ -78,7 +90,12 @@
mongoConfig = MongoConfig.load(config);
mongoConfig.validate();
- mongoClient = MongoClients.create(mongoConfig.getMongoUri());
+ if (clientProvider != null) {
+ mongoClient = clientProvider.get();
+ } else {
+ mongoClient = MongoClients.create(mongoConfig.getMongoUri());
+ }
+
final MongoDatabase db = mongoClient.getDatabase(mongoConfig.getDatabase());
collection = db.getCollection(mongoConfig.getCollection());
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
index c941708..c4ca44a 100644
--- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
@@ -19,36 +19,35 @@
package org.apache.pulsar.io.mongodb;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import com.mongodb.MongoBulkWriteException;
import com.mongodb.async.SingleResultCallback;
import com.mongodb.async.client.MongoClient;
-import com.mongodb.async.client.MongoClients;
import com.mongodb.async.client.MongoCollection;
import com.mongodb.async.client.MongoDatabase;
import com.mongodb.bulk.BulkWriteError;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SinkContext;
import org.bson.BsonDocument;
import org.mockito.Mock;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.IObjectFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.*;
-
-@PrepareForTest(MongoClients.class)
-@PowerMockIgnore({"org.apache.logging.log4j.*"})
public class MongoSinkTest {
@Mock
@@ -78,7 +77,7 @@
@BeforeMethod
public void setUp() {
- sink = new MongoSink();
+
map = TestHelper.createMap(true);
mockRecord = mock(Record.class);
@@ -87,9 +86,8 @@
mockMongoDb = mock(MongoDatabase.class);
mockMongoColl = mock(MongoCollection.class);
- PowerMockito.mockStatic(MongoClients.class);
+ sink = new MongoSink(() -> mockMongoClient);
- when(MongoClients.create(anyString())).thenReturn(mockMongoClient);
when(mockMongoClient.getDatabase(anyString())).thenReturn(mockMongoDb);
when(mockMongoDb.getCollection(anyString())).thenReturn(mockMongoColl);
}
@@ -98,7 +96,7 @@
when(mockRecord.getValue()).thenReturn("{\"hello\":\"pulsar\"}".getBytes());
doAnswer((invocation) -> {
- SingleResultCallback cb = invocation.getArgumentAt(1, SingleResultCallback.class);
+ SingleResultCallback cb = invocation.getArgument(1, SingleResultCallback.class);
MongoBulkWriteException exc = null;
if (throwBulkError) {
@@ -109,17 +107,17 @@
cb.onResult(null, exc);
return null;
- }).when(mockMongoColl).insertMany(anyObject(), anyObject());
+ }).when(mockMongoColl).insertMany(any(), any());
}
private void initFailContext(String msg) {
when(mockRecord.getValue()).thenReturn(msg.getBytes());
doAnswer((invocation) -> {
- SingleResultCallback cb = invocation.getArgumentAt(1, SingleResultCallback.class);
+ SingleResultCallback cb = invocation.getArgument(1, SingleResultCallback.class);
cb.onResult(null, new Exception("Oops"));
return null;
- }).when(mockMongoColl).insertMany(anyObject(), anyObject());
+ }).when(mockMongoColl).insertMany(any(), any());
}
@AfterMethod