blob: 8b74d04b2a9c52a77f6ae47f53fe07ff00eb9bce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.java.tool;
import apache.rocketmq.v2.AckMessageRequest;
import apache.rocketmq.v2.AckMessageResponse;
import apache.rocketmq.v2.Address;
import apache.rocketmq.v2.AddressScheme;
import apache.rocketmq.v2.Assignment;
import apache.rocketmq.v2.Broker;
import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.Digest;
import apache.rocketmq.v2.DigestType;
import apache.rocketmq.v2.EndTransactionResponse;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.MessageQueue;
import apache.rocketmq.v2.MessageType;
import apache.rocketmq.v2.Permission;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.Resource;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.SendResultEntry;
import apache.rocketmq.v2.Status;
import apache.rocketmq.v2.SystemProperties;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import io.grpc.Metadata;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomUtils;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.java.impl.producer.PublishingSettings;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.apache.rocketmq.client.java.message.MessageIdCodec;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.misc.RequestIdGenerator;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.misc.Utilities;
import org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy;
import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.rpc.Context;
import org.apache.rocketmq.client.java.rpc.RpcFuture;
import org.apache.rocketmq.client.java.rpc.Signature;
import org.mockito.Mockito;
public class TestBase {
protected static final String FAKE_NAMESPACE = "foo-bar-namespace";
protected static final ClientId FAKE_CLIENT_ID = new ClientId();
protected static final String FAKE_TOPIC_0 = "foo-bar-topic-0";
protected static final String FAKE_TOPIC_1 = "foo-bar-topic-1";
protected static final byte[] FAKE_MESSAGE_BODY = "foobar".getBytes(StandardCharsets.UTF_8);
protected static final String FAKE_TAG_0 = "foo-bar-tag-0";
protected static final String FAKE_BROKER_NAME_0 = "foo-bar-broker-name-0";
protected static final String FAKE_BROKER_NAME_1 = "foo-bar-broker-name-1";
protected static final String FAKE_RECEIPT_HANDLE_0 = "foo-bar-handle-0";
protected static final String FAKE_RECEIPT_HANDLE_1 = "foo-bar-handle-1";
protected static final String FAKE_ENDPOINTS = "127.0.0.1:9876";
protected static final String FAKE_HOST_0 = "127.0.0.1";
protected static final int FAKE_PORT_0 = 8080;
protected static final String FAKE_HOST_1 = "127.0.0.2";
protected static final int FAKE_PORT_1 = 8081;
protected static final String FAKE_CONSUMER_GROUP_0 = "foo-bar-group-0";
protected static final String FAKE_TRANSACTION_ID = "foo-bar-transaction-id";
protected static final long FAKE_OFFSET = 1;
protected static final ScheduledExecutorService SCHEDULER =
new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new ThreadFactoryImpl(
"TestScheduler"));
protected static final ThreadPoolExecutor SINGLE_THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), new ThreadFactoryImpl("TestSingleWorker"));
protected Address fakePbAddress0() {
return fakePbAddress(FAKE_HOST_0, FAKE_PORT_0);
}
protected Address fakePbAddress1() {
return fakePbAddress(FAKE_HOST_1, FAKE_PORT_1);
}
protected Address fakePbAddress(String host, int port) {
return Address.newBuilder().setHost(host).setPort(port).build();
}
protected apache.rocketmq.v2.Endpoints fakePbEndpoints0() {
return fakePbEndpoints(fakePbAddress0());
}
protected apache.rocketmq.v2.Endpoints fakePbEndpoints1() {
return fakePbEndpoints(fakePbAddress1());
}
protected apache.rocketmq.v2.Endpoints fakePbEndpoints(Address address) {
return apache.rocketmq.v2.Endpoints.newBuilder().setScheme(AddressScheme.IPv4).addAddresses(address).build();
}
protected Endpoints fakeEndpoints() {
return new Endpoints(fakePbEndpoints0());
}
protected Context fakeRpcContext() {
final Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of(Signature.REQUEST_ID_KEY, Metadata.ASCII_STRING_MARSHALLER),
RequestIdGenerator.getInstance().next());
return new Context(fakeEndpoints(), metadata);
}
protected Message fakeMessage(String topic) {
return new MessageBuilderImpl().setTopic(topic).setBody(RandomUtils.nextBytes(1)).build();
}
protected MessageViewImpl fakeMessageViewImpl() {
return fakeMessageViewImpl(1, false);
}
protected MessageViewImpl fakeMessageViewImpl(MessageQueueImpl mq) {
return fakeMessageViewImpl(mq, 1, false);
}
protected MessageViewImpl fakeMessageViewImpl(boolean corrupted) {
return fakeMessageViewImpl(1, corrupted);
}
protected MessageViewImpl fakeMessageViewImpl(int bodySize, boolean corrupted) {
return fakeMessageViewImpl(fakeMessageQueueImpl0(), bodySize, corrupted);
}
protected MessageViewImpl fakeMessageViewImpl(MessageQueueImpl mq, int bodySize, boolean corrupted) {
MessageId messageId = MessageIdCodec.getInstance().nextMessageId();
final byte[] body = RandomUtils.nextBytes(bodySize);
Map<String, String> properties = new HashMap<>();
List<String> keys = new ArrayList<>();
return new MessageViewImpl(messageId, FAKE_TOPIC_0, body, null, null, null,
keys, properties, FAKE_HOST_0, 1, 1, mq, FAKE_RECEIPT_HANDLE_0, 1, corrupted,
System.currentTimeMillis());
}
protected MessageQueueImpl fakeMessageQueueImpl(String topic) {
return new MessageQueueImpl(fakePbMessageQueue0(Resource.newBuilder().setName(topic).build()));
}
protected MessageQueueImpl fakeMessageQueueImpl(String namespace, String topic) {
return new MessageQueueImpl(fakePbMessageQueue0(
Resource.newBuilder()
.setResourceNamespace(namespace)
.setName(topic)
.build()));
}
protected MessageQueueImpl fakeMessageQueueImpl0() {
return new MessageQueueImpl(fakePbMessageQueue0());
}
protected MessageQueueImpl fakeMessageQueueImpl1() {
return new MessageQueueImpl(fakePbMessageQueue1());
}
protected Resource fakePbTopic0() {
return Resource.newBuilder().setName(FAKE_TOPIC_0).build();
}
protected Broker fakePbBroker0() {
return Broker.newBuilder().setEndpoints(fakePbEndpoints0()).setName(FAKE_BROKER_NAME_0)
.setId(Utilities.MASTER_BROKER_ID).build();
}
protected Broker fakePbBroker1() {
return Broker.newBuilder().setEndpoints(fakePbEndpoints1()).setName(FAKE_BROKER_NAME_1)
.setId(Utilities.MASTER_BROKER_ID).build();
}
protected MessageQueue fakePbMessageQueue0() {
return fakePbMessageQueue0(fakePbTopic0());
}
protected MessageQueue fakePbMessageQueue1() {
return fakePbMessageQueue1(fakePbTopic0());
}
protected MessageQueue fakePbMessageQueue0(Resource topicResource) {
return MessageQueue.newBuilder().setTopic(topicResource).setBroker(fakePbBroker0())
.setPermission(Permission.READ_WRITE).build();
}
protected MessageQueue fakePbMessageQueue1(Resource topicResource) {
return MessageQueue.newBuilder().setTopic(topicResource).setBroker(fakePbBroker1())
.setPermission(Permission.READ_WRITE).build();
}
protected RpcFuture<ChangeInvisibleDurationRequest, ChangeInvisibleDurationResponse>
okChangeInvisibleDurationCtxFuture() {
Status status = Status.newBuilder().setCode(Code.OK).build();
final ChangeInvisibleDurationResponse response =
ChangeInvisibleDurationResponse.newBuilder().setStatus(status).build();
return new RpcFuture<>(fakeRpcContext(), null, Futures.immediateFuture(response));
}
protected RpcFuture<ChangeInvisibleDurationRequest, ChangeInvisibleDurationResponse>
changInvisibleDurationCtxFuture(Code code) {
Status status = Status.newBuilder().setCode(code).build();
final ChangeInvisibleDurationResponse response =
ChangeInvisibleDurationResponse.newBuilder().setStatus(status).build();
return new RpcFuture<>(fakeRpcContext(), null, Futures.immediateFuture(response));
}
protected RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse> okQueryAssignmentResponseFuture() {
final SettableFuture<QueryAssignmentResponse> future = SettableFuture.create();
final Status status = Status.newBuilder().setCode(Code.OK).build();
Assignment assignment = Assignment.newBuilder().setMessageQueue(fakePbMessageQueue0()).build();
QueryAssignmentResponse response = QueryAssignmentResponse.newBuilder().setStatus(status)
.addAssignments(assignment).build();
return new RpcFuture<>(fakeRpcContext(), null, Futures.immediateFuture(response));
}
protected RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse> okEmptyQueryAssignmentResponseFuture() {
final SettableFuture<QueryAssignmentResponse> future = SettableFuture.create();
final Status status = Status.newBuilder().setCode(Code.OK).build();
final QueryAssignmentResponse response = QueryAssignmentResponse.newBuilder().setStatus(status).build();
return new RpcFuture<>(fakeRpcContext(), null, Futures.immediateFuture(response));
}
protected Map<String, FilterExpression> createSubscriptionExpressions(String topic) {
Map<String, FilterExpression> map = new HashMap<>();
final FilterExpression filterExpression = FilterExpression.SUB_ALL;
map.put(topic, filterExpression);
return map;
}
protected RpcFuture<AckMessageRequest, AckMessageResponse> ackMessageResponseFuture(Code code) {
final Status status = Status.newBuilder().setCode(code).build();
final AckMessageResponse response = AckMessageResponse.newBuilder().setStatus(status).build();
return new RpcFuture<>(fakeRpcContext(), null, Futures.immediateFuture(response));
}
protected RpcFuture<AckMessageRequest, AckMessageResponse> okAckMessageResponseFuture() {
final Status status = Status.newBuilder().setCode(Code.OK).build();
final AckMessageRequest request = Mockito.mock(AckMessageRequest.class);
final AckMessageResponse response = AckMessageResponse.newBuilder().setStatus(status).build();
return new RpcFuture<>(fakeRpcContext(), request,
Futures.immediateFuture(response));
}
protected RpcFuture<ForwardMessageToDeadLetterQueueRequest,
ForwardMessageToDeadLetterQueueResponse> okForwardMessageToDeadLetterQueueResponseFuture() {
final Status status = Status.newBuilder().setCode(Code.OK).build();
final ForwardMessageToDeadLetterQueueResponse response =
ForwardMessageToDeadLetterQueueResponse.newBuilder().setStatus(status).build();
return new RpcFuture<>(fakeRpcContext(), null, Futures.immediateFuture(response));
}
protected RpcFuture<ForwardMessageToDeadLetterQueueRequest,
ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueueResponseFuture(Code code) {
final Status status = Status.newBuilder().setCode(code).build();
final ForwardMessageToDeadLetterQueueResponse response =
ForwardMessageToDeadLetterQueueResponse.newBuilder().setStatus(status).build();
return new RpcFuture<>(fakeRpcContext(), null, Futures.immediateFuture(response));
}
protected RpcFuture<SendMessageRequest, SendMessageResponse> okSendMessageResponseFutureWithSingleEntry() {
final Status status = Status.newBuilder().setCode(Code.OK).build();
final String messageId = MessageIdCodec.getInstance().nextMessageId().toString();
SendResultEntry entry = SendResultEntry.newBuilder().setMessageId(messageId)
.setTransactionId(FAKE_TRANSACTION_ID).setStatus(status).setOffset(1).build();
SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry).build();
return new RpcFuture<>(fakeRpcContext(), null, Futures.immediateFuture(response));
}
protected RpcFuture<SendMessageRequest, SendMessageResponse> failureSendMessageResponseFuture() {
final Status status = Status.newBuilder().setCode(Code.FORBIDDEN).build();
SendResultEntry sendResultEntry = SendResultEntry.newBuilder().setStatus(status).setStatus(status).build();
SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status)
.addEntries(sendResultEntry).build();
return new RpcFuture<>(fakeRpcContext(), null, Futures.immediateFuture(response));
}
protected ListenableFuture<SendMessageResponse> okBatchSendMessageResponseFuture() {
final Status status = Status.newBuilder().setCode(Code.OK).build();
SettableFuture<SendMessageResponse> future0 = SettableFuture.create();
final String messageId = MessageIdCodec.getInstance().nextMessageId().toString();
SendResultEntry entry0 = SendResultEntry.newBuilder().setMessageId(messageId).setStatus(status)
.setOffset(1).build();
SendResultEntry entry1 = SendResultEntry.newBuilder().setMessageId(messageId).setStatus(status)
.setOffset(2).build();
SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry0)
.addEntries(entry1).build();
future0.set(response);
return future0;
}
protected apache.rocketmq.v2.Message fakePbMessage(String topic) {
final Digest digest = Digest.newBuilder().setType(DigestType.CRC32).setChecksum("9EF61F95").build();
SystemProperties systemProperties = SystemProperties.newBuilder().setMessageType(MessageType.NORMAL)
.setMessageId(MessageIdCodec.getInstance().nextMessageId().toString())
.setBornHost(FAKE_HOST_0)
.setBodyDigest(digest)
.build();
Resource resource = Resource.newBuilder().setName(topic).build();
return apache.rocketmq.v2.Message.newBuilder().setSystemProperties(systemProperties)
.setTopic(resource).setBody(ByteString.copyFrom("foobar", StandardCharsets.UTF_8))
.setSystemProperties(systemProperties).build();
}
protected RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>> okReceiveMessageResponsesFuture(
String topic, int messageCount) {
final Status status = Status.newBuilder().setCode(Code.OK).build();
final apache.rocketmq.v2.Message message = fakePbMessage(topic);
List<ReceiveMessageResponse> responses = new ArrayList<>();
ReceiveMessageResponse statusResponse = ReceiveMessageResponse.newBuilder().setStatus(status).build();
responses.add(statusResponse);
for (int i = 0; i < messageCount; i++) {
ReceiveMessageResponse messageResponse = ReceiveMessageResponse.newBuilder().setMessage(message).build();
responses.add(messageResponse);
}
return new RpcFuture<>(fakeRpcContext(), null, Futures.immediateFuture(responses));
}
protected ListenableFuture<EndTransactionResponse> okEndTransactionResponseFuture() {
SettableFuture<EndTransactionResponse> future = SettableFuture.create();
final Status status = Status.newBuilder().setCode(Code.OK).build();
EndTransactionResponse response = EndTransactionResponse.newBuilder().setStatus(status).build();
future.set(response);
return future;
}
protected CustomizedBackoffRetryPolicy createCustomizedBackoffRetryPolicy(int maxAttempts) {
List<Duration> durations = new ArrayList<>();
durations.add(Duration.ofSeconds(1));
return new CustomizedBackoffRetryPolicy(durations, maxAttempts);
}
protected ExponentialBackoffRetryPolicy fakeExponentialBackoffRetryPolicy() {
return new ExponentialBackoffRetryPolicy(3, Duration.ofMillis(100), Duration.ofSeconds(3), 2);
}
protected PublishingSettings fakeProducerSettings() {
return new PublishingSettings(FAKE_NAMESPACE, FAKE_CLIENT_ID, fakeEndpoints(),
fakeExponentialBackoffRetryPolicy(), Duration.ofSeconds(1), new HashSet<>());
}
protected SendReceiptImpl fakeSendReceiptImpl(
MessageQueueImpl mq) throws ExecutionException, InterruptedException, ClientException {
final RpcFuture<SendMessageRequest, SendMessageResponse> future =
okSendMessageResponseFutureWithSingleEntry();
final List<SendReceiptImpl> receipts = SendReceiptImpl.processResponseInvocation(mq, future.get(), future);
return receipts.iterator().next();
}
}