Upgrade to Mockito 2.x (#4671)
Upgrading to Mockito 2.28 and PowerMock 2.0. This a pre-step to be able to run CI with Java 11 / 12
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
index 1ded3c6..bf19a18 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -18,9 +18,29 @@
*/
package org.apache.kafka.clients.producer;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.anyVararg;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
+
import org.apache.avro.reflect.Nullable;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -35,8 +55,6 @@
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -45,25 +63,6 @@
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.anyVararg;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
@PrepareForTest({PulsarClientKafkaConfig.class, PulsarProducerKafkaConfig.class})
@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.kafka.clients.producer.ProducerInterceptor"})
public class PulsarKafkaProducerTest {
@@ -96,26 +95,20 @@
public void testPulsarKafkaProducer() {
ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
- return mockProducerBuilder;
- }
+ doAnswer(invocation -> {
+ Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
+ return mockProducerBuilder;
}).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class));
doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
- return mockClientBuilder;
- }
+ doAnswer(invocation -> {
+ Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
+ return mockClientBuilder;
}).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
- when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);
+ when(PulsarProducerKafkaConfig.getProducerBuilder(any(), any())).thenReturn(mockProducerBuilder);
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
index 0f15691..f96c9c7 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
@@ -48,7 +48,7 @@
import java.util.Arrays;
import java.util.Random;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
index 7fd48c2..83a7549 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.flink.streaming.connectors.pulsar;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.avro.generated.NasaMission;
@@ -30,7 +31,6 @@
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.mockito.Mockito;
-import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.api.mockito.PowerMockito;
import org.testng.annotations.Test;
@@ -109,10 +109,10 @@
Mockito.any(SerializationSchema.class),
Mockito.any(PulsarKeyExtractor.class)
).thenReturn(producer);
- Whitebox.setInternalState(sink, "fieldNames", fieldNames);
- Whitebox.setInternalState(sink, "fieldTypes", typeInformations);
- Whitebox.setInternalState(sink, "serializationSchema", Mockito.mock(SerializationSchema.class));
- Whitebox.setInternalState(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class));
+ FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
+ FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
+ FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
+ FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
return sink;
}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
index b96f971..82d831b 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
@@ -38,7 +38,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import io.netty.buffer.Unpooled;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -57,7 +57,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
/**
* Tests for the PulsarConsumerSource. The source supports two operation modes.
@@ -542,7 +542,7 @@
private static Message<byte[]> createMessage(String content, String messageId) {
return new MessageImpl<byte[]>("my-topic", messageId, Collections.emptyMap(),
- Unpooled.wrappedBuffer(content.getBytes()), Schema.BYTES);
+ content.getBytes(), Schema.BYTES);
}
private static String createMessageId(long ledgerId, long entryId, long partitionIndex) {
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
index 668a8e5..02462b3 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.flink.streaming.connectors.pulsar;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -29,7 +30,6 @@
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.mockito.Mockito;
-import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.api.mockito.PowerMockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -105,10 +105,11 @@
Mockito.any(SerializationSchema.class),
Mockito.any(PulsarKeyExtractor.class)
).thenReturn(producer);
- Whitebox.setInternalState(sink, "fieldNames", fieldNames);
- Whitebox.setInternalState(sink, "fieldTypes", typeInformations);
- Whitebox.setInternalState(sink, "serializationSchema", Mockito.mock(SerializationSchema.class));
- Whitebox.setInternalState(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class));
+
+ FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
+ FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
+ FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
+ FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
return sink;
}
}
diff --git a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
index 68bfbd5..45aeca9 100644
--- a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
+++ b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
@@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+
import org.apache.logging.log4j.core.AbstractLifeCycle;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Filter;
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
index 71b94ae..73cbadf 100644
--- a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
+++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
@@ -18,10 +18,10 @@
*/
package org.apache.pulsar.log4j2.appender;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -120,11 +120,30 @@
doReturn(producerBuilder).when(producerBuilder).topic(anyString());
doReturn(producerBuilder).when(producerBuilder).producerName(anyString());
doReturn(producerBuilder).when(producerBuilder).enableBatching(anyBoolean());
- doReturn(producerBuilder).when(producerBuilder).batchingMaxPublishDelay(anyInt(), any(TimeUnit.class));
+ doReturn(producerBuilder).when(producerBuilder).batchingMaxPublishDelay(anyLong(), any(TimeUnit.class));
doReturn(producerBuilder).when(producerBuilder).blockIfQueueFull(anyBoolean());
doReturn(producer).when(producerBuilder).create();
when(producer.newMessage()).then(invocation -> new MockedMessageBuilder());
+ when(producer.send(any(byte[].class)))
+ .thenAnswer(invocationOnMock -> {
+ Message<byte[]> msg = invocationOnMock.getArgument(0);
+ synchronized (history) {
+ history.add(msg);
+ }
+ return null;
+ });
+
+ when(producer.sendAsync(any(byte[].class)))
+ .thenAnswer(invocationOnMock -> {
+ Message<byte[]> msg = invocationOnMock.getArgument(0);
+ synchronized (history) {
+ history.add(msg);
+ }
+ CompletableFuture<MessageId> future = new CompletableFuture<>();
+ future.complete(mock(MessageId.class));
+ return future;
+ });
PulsarManager.PULSAR_CLIENT_BUILDER = () -> clientBuilder;
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index 5764ac7..f8a1aa0 100644
--- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -18,8 +18,8 @@
*/
package org.apache.pulsar.storm;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -33,7 +33,6 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.api.ClientBuilder;
@@ -43,26 +42,20 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
-import org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator;
import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
import com.google.common.collect.Maps;
public class PulsarSpoutTest {
- private static final Logger log = LoggerFactory.getLogger(PulsarSpoutTest.class);
-
@Test
public void testAckFailedMessage() throws Exception {
-
+
PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
conf.setServiceUrl("http://localhost:8080");
conf.setSubscriptionName("sub1");
@@ -77,13 +70,13 @@
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
-
+
});
-
+
ClientBuilder builder = spy(new ClientBuilderImpl());
PulsarSpout spout = spy(new PulsarSpout(conf, builder));
-
- Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), null, Schema.BYTES);
+
+ Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), new byte[0], Schema.BYTES);
Consumer<byte[]> consumer = mock(Consumer.class);
SpoutConsumer spoutConsumer = new SpoutConsumer(consumer);
CompletableFuture<Void> future = new CompletableFuture<>();
@@ -92,13 +85,13 @@
Field consField = PulsarSpout.class.getDeclaredField("consumer");
consField.setAccessible(true);
consField.set(spout, spoutConsumer);
-
+
spout.fail(msg);
spout.ack(msg);
spout.emitNextAvailableTuple();
verify(consumer, atLeast(1)).receive(anyInt(), any());
}
-
+
@Test
public void testPulsarSpout() throws Exception {
PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
@@ -138,9 +131,7 @@
when(client.getSharedConsumer(any())).thenReturn(consumer);
instances.put(componentId, client);
- ByteBuf data = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
- data.writeBytes("test".getBytes());
- Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), data, Schema.BYTES);
+ Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), "test".getBytes(), Schema.BYTES);
when(consumer.receive(anyInt(), any())).thenReturn(msg);
spout.open(config, context, collector);
@@ -149,5 +140,5 @@
assertTrue(called.get());
verify(consumer, atLeast(1)).receive(anyInt(), any());
}
-
+
}