Upgrade to Mockito 2.x (#4671)

Upgrading to Mockito 2.28 and PowerMock 2.0. This a pre-step to be able to run CI with Java 11 / 12


diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
index a21633d..a4582bc 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.io.file;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
index 855498e..5be101f 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.io.file;
 
 import static org.testng.Assert.fail;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
index 98d35a9..9970a55 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.io.file;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
diff --git a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
index fe8941e..a746f7e 100644
--- a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
+++ b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
@@ -19,8 +19,6 @@
 package org.apache.pulsar.io.influxdb;
 
 import com.google.common.collect.Maps;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
@@ -46,7 +44,7 @@
 
 import java.util.Map;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -151,7 +149,7 @@
         verify(this.influxDB, times(1)).createDatabase("testDB");
 
         doAnswer(invocationOnMock -> {
-            BatchPoints batchPoints = invocationOnMock.getArgumentAt(0, BatchPoints.class);
+            BatchPoints batchPoints = invocationOnMock.getArgument(0, BatchPoints.class);
             Assert.assertNotNull(batchPoints, "batchPoints should not be null.");
             return null;
         }).when(influxDB).write(any(BatchPoints.class));
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
index 3eb6f6e..7dc665d 100644
--- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
@@ -18,21 +18,14 @@
  */
 package org.apache.pulsar.io.mongodb;
 
+import static java.util.stream.Collectors.toList;
+
 import com.google.common.collect.Lists;
 import com.mongodb.MongoBulkWriteException;
 import com.mongodb.async.client.MongoClient;
 import com.mongodb.async.client.MongoClients;
 import com.mongodb.async.client.MongoCollection;
 import com.mongodb.async.client.MongoDatabase;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.core.annotations.Connector;
-import org.apache.pulsar.io.core.annotations.IOType;
-import org.bson.BSONException;
-import org.bson.Document;
-import org.bson.json.JsonParseException;
 
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -42,9 +35,19 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import java.util.stream.IntStream;
 
-import static java.util.stream.Collectors.toList;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.bson.BSONException;
+import org.bson.Document;
+import org.bson.json.JsonParseException;
 
 /**
  * The base class for MongoDB sinks.
@@ -70,6 +73,15 @@
 
     private ScheduledExecutorService flushExecutor;
 
+    private Supplier<MongoClient> clientProvider;
+
+    public MongoSink() {
+        this(null);
+    }
+
+    public MongoSink(Supplier<MongoClient> clientProvider) {
+        this.clientProvider = clientProvider;
+    }
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
@@ -78,7 +90,12 @@
         mongoConfig = MongoConfig.load(config);
         mongoConfig.validate();
 
-        mongoClient = MongoClients.create(mongoConfig.getMongoUri());
+        if (clientProvider != null) {
+            mongoClient = clientProvider.get();
+        } else {
+            mongoClient = MongoClients.create(mongoConfig.getMongoUri());
+        }
+
         final MongoDatabase db = mongoClient.getDatabase(mongoConfig.getDatabase());
         collection = db.getCollection(mongoConfig.getCollection());
 
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
index c941708..c4ca44a 100644
--- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
@@ -19,36 +19,35 @@
 
 package org.apache.pulsar.io.mongodb;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import com.mongodb.MongoBulkWriteException;
 import com.mongodb.async.SingleResultCallback;
 import com.mongodb.async.client.MongoClient;
-import com.mongodb.async.client.MongoClients;
 import com.mongodb.async.client.MongoCollection;
 import com.mongodb.async.client.MongoDatabase;
 import com.mongodb.bulk.BulkWriteError;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.SinkContext;
 import org.bson.BsonDocument;
 import org.mockito.Mock;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.IObjectFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.*;
-
-@PrepareForTest(MongoClients.class)
-@PowerMockIgnore({"org.apache.logging.log4j.*"})
 public class MongoSinkTest {
 
     @Mock
@@ -78,7 +77,7 @@
 
     @BeforeMethod
     public void setUp() {
-        sink = new MongoSink();
+
         map = TestHelper.createMap(true);
 
         mockRecord = mock(Record.class);
@@ -87,9 +86,8 @@
         mockMongoDb = mock(MongoDatabase.class);
         mockMongoColl = mock(MongoCollection.class);
 
-        PowerMockito.mockStatic(MongoClients.class);
+        sink = new MongoSink(() -> mockMongoClient);
 
-        when(MongoClients.create(anyString())).thenReturn(mockMongoClient);
         when(mockMongoClient.getDatabase(anyString())).thenReturn(mockMongoDb);
         when(mockMongoDb.getCollection(anyString())).thenReturn(mockMongoColl);
     }
@@ -98,7 +96,7 @@
         when(mockRecord.getValue()).thenReturn("{\"hello\":\"pulsar\"}".getBytes());
 
         doAnswer((invocation) -> {
-            SingleResultCallback cb = invocation.getArgumentAt(1, SingleResultCallback.class);
+            SingleResultCallback cb = invocation.getArgument(1, SingleResultCallback.class);
             MongoBulkWriteException exc = null;
 
             if (throwBulkError) {
@@ -109,17 +107,17 @@
 
             cb.onResult(null, exc);
             return null;
-        }).when(mockMongoColl).insertMany(anyObject(), anyObject());
+        }).when(mockMongoColl).insertMany(any(), any());
     }
 
     private void initFailContext(String msg) {
         when(mockRecord.getValue()).thenReturn(msg.getBytes());
 
         doAnswer((invocation) -> {
-            SingleResultCallback cb = invocation.getArgumentAt(1, SingleResultCallback.class);
+            SingleResultCallback cb = invocation.getArgument(1, SingleResultCallback.class);
             cb.onResult(null, new Exception("Oops"));
             return null;
-        }).when(mockMongoColl).insertMany(anyObject(), anyObject());
+        }).when(mockMongoColl).insertMany(any(), any());
     }
 
     @AfterMethod