Upgrade to Mockito 2.x (#4671)

Upgrading to Mockito 2.28 and PowerMock 2.0. This a pre-step to be able to run CI with Java 11 / 12


diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
index 1ded3c6..bf19a18 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -18,9 +18,29 @@
  */
 package org.apache.kafka.clients.producer;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.anyVararg;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
+
 import org.apache.avro.reflect.Nullable;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -35,8 +55,6 @@
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -45,25 +63,6 @@
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.anyVararg;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 @PrepareForTest({PulsarClientKafkaConfig.class, PulsarProducerKafkaConfig.class})
 @PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.kafka.clients.producer.ProducerInterceptor"})
 public class PulsarKafkaProducerTest {
@@ -96,26 +95,20 @@
     public void testPulsarKafkaProducer() {
         ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
         ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
-                Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
-                return mockProducerBuilder;
-            }
+        doAnswer(invocation -> {
+            Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
+            return mockProducerBuilder;
         }).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class));
         doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
-                Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
-                return mockClientBuilder;
-            }
+        doAnswer(invocation -> {
+            Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
+            return mockClientBuilder;
         }).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
 
         PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
         PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
         when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
-        when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);
+        when(PulsarProducerKafkaConfig.getProducerBuilder(any(), any())).thenReturn(mockProducerBuilder);
 
         Properties properties = new Properties();
         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
index 0f15691..f96c9c7 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
@@ -48,7 +48,7 @@
 import java.util.Arrays;
 import java.util.Random;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
index 7fd48c2..83a7549 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flink.streaming.connectors.pulsar;
 
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.avro.generated.NasaMission;
@@ -30,7 +31,6 @@
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.mockito.Mockito;
-import org.mockito.internal.util.reflection.Whitebox;
 import org.powermock.api.mockito.PowerMockito;
 import org.testng.annotations.Test;
 
@@ -109,10 +109,10 @@
                 Mockito.any(SerializationSchema.class),
                 Mockito.any(PulsarKeyExtractor.class)
         ).thenReturn(producer);
-        Whitebox.setInternalState(sink, "fieldNames", fieldNames);
-        Whitebox.setInternalState(sink, "fieldTypes", typeInformations);
-        Whitebox.setInternalState(sink, "serializationSchema", Mockito.mock(SerializationSchema.class));
-        Whitebox.setInternalState(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class));
+        FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
+        FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
+        FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
+        FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
         return sink;
     }
 
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
index b96f971..82d831b 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
@@ -38,7 +38,7 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import io.netty.buffer.Unpooled;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -57,7 +57,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 
 /**
  * Tests for the PulsarConsumerSource. The source supports two operation modes.
@@ -542,7 +542,7 @@
 
     private static Message<byte[]> createMessage(String content, String messageId) {
         return new MessageImpl<byte[]>("my-topic", messageId, Collections.emptyMap(),
-                                       Unpooled.wrappedBuffer(content.getBytes()), Schema.BYTES);
+                                       content.getBytes(), Schema.BYTES);
     }
 
     private static String createMessageId(long ledgerId, long entryId, long partitionIndex) {
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
index 668a8e5..02462b3 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flink.streaming.connectors.pulsar;
 
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -29,7 +30,6 @@
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.mockito.Mockito;
-import org.mockito.internal.util.reflection.Whitebox;
 import org.powermock.api.mockito.PowerMockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -105,10 +105,11 @@
                 Mockito.any(SerializationSchema.class),
                 Mockito.any(PulsarKeyExtractor.class)
         ).thenReturn(producer);
-        Whitebox.setInternalState(sink, "fieldNames", fieldNames);
-        Whitebox.setInternalState(sink, "fieldTypes", typeInformations);
-        Whitebox.setInternalState(sink, "serializationSchema", Mockito.mock(SerializationSchema.class));
-        Whitebox.setInternalState(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class));
+
+        FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
+        FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
+        FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
+        FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
         return sink;
     }
 }
diff --git a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
index 68bfbd5..45aeca9 100644
--- a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
+++ b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
@@ -21,6 +21,7 @@
 import java.io.Serializable;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.logging.log4j.core.AbstractLifeCycle;
 import org.apache.logging.log4j.core.Appender;
 import org.apache.logging.log4j.core.Filter;
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
index 71b94ae..73cbadf 100644
--- a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
+++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
@@ -18,10 +18,10 @@
  */
 package org.apache.pulsar.log4j2.appender;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -120,11 +120,30 @@
         doReturn(producerBuilder).when(producerBuilder).topic(anyString());
         doReturn(producerBuilder).when(producerBuilder).producerName(anyString());
         doReturn(producerBuilder).when(producerBuilder).enableBatching(anyBoolean());
-        doReturn(producerBuilder).when(producerBuilder).batchingMaxPublishDelay(anyInt(), any(TimeUnit.class));
+        doReturn(producerBuilder).when(producerBuilder).batchingMaxPublishDelay(anyLong(), any(TimeUnit.class));
         doReturn(producerBuilder).when(producerBuilder).blockIfQueueFull(anyBoolean());
         doReturn(producer).when(producerBuilder).create();
 
         when(producer.newMessage()).then(invocation -> new MockedMessageBuilder());
+        when(producer.send(any(byte[].class)))
+            .thenAnswer(invocationOnMock -> {
+                Message<byte[]> msg = invocationOnMock.getArgument(0);
+                synchronized (history) {
+                    history.add(msg);
+                }
+                return null;
+            });
+
+        when(producer.sendAsync(any(byte[].class)))
+            .thenAnswer(invocationOnMock -> {
+                Message<byte[]> msg = invocationOnMock.getArgument(0);
+                synchronized (history) {
+                    history.add(msg);
+                }
+                CompletableFuture<MessageId> future = new CompletableFuture<>();
+                future.complete(mock(MessageId.class));
+                return future;
+            });
 
         PulsarManager.PULSAR_CLIENT_BUILDER = () -> clientBuilder;
 
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index 5764ac7..f8a1aa0 100644
--- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pulsar.storm;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -33,7 +33,6 @@
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.pulsar.client.api.ClientBuilder;
@@ -43,26 +42,20 @@
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
-import org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator;
 import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Maps;
 
 public class PulsarSpoutTest {
 
-    private static final Logger log = LoggerFactory.getLogger(PulsarSpoutTest.class);
-    
     @Test
     public void testAckFailedMessage() throws Exception {
-        
+
         PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
         conf.setServiceUrl("http://localhost:8080");
         conf.setSubscriptionName("sub1");
@@ -77,13 +70,13 @@
             @Override
             public void declareOutputFields(OutputFieldsDeclarer declarer) {
             }
-            
+
         });
-        
+
         ClientBuilder builder = spy(new ClientBuilderImpl());
         PulsarSpout spout = spy(new PulsarSpout(conf, builder));
-        
-        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), null, Schema.BYTES);
+
+        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), new byte[0], Schema.BYTES);
         Consumer<byte[]> consumer = mock(Consumer.class);
         SpoutConsumer spoutConsumer = new SpoutConsumer(consumer);
         CompletableFuture<Void> future = new CompletableFuture<>();
@@ -92,13 +85,13 @@
         Field consField = PulsarSpout.class.getDeclaredField("consumer");
         consField.setAccessible(true);
         consField.set(spout, spoutConsumer);
-        
+
         spout.fail(msg);
         spout.ack(msg);
         spout.emitNextAvailableTuple();
         verify(consumer, atLeast(1)).receive(anyInt(), any());
     }
-    
+
     @Test
     public void testPulsarSpout() throws Exception {
         PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
@@ -138,9 +131,7 @@
         when(client.getSharedConsumer(any())).thenReturn(consumer);
         instances.put(componentId, client);
 
-        ByteBuf data = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
-        data.writeBytes("test".getBytes());
-        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), data, Schema.BYTES);
+        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), "test".getBytes(), Schema.BYTES);
         when(consumer.receive(anyInt(), any())).thenReturn(msg);
 
         spout.open(config, context, collector);
@@ -149,5 +140,5 @@
         assertTrue(called.get());
         verify(consumer, atLeast(1)).receive(anyInt(), any());
     }
-    
+
 }