blob: 66deec77701c5b58c526678da211c1168c4598f1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { strict as assert } from 'node:assert';
import { randomUUID } from 'node:crypto';
import { NotFoundException, Producer, SimpleConsumer } from '../../src';
import { TransactionResolution } from '../../proto/apache/rocketmq/v2/definition_pb';
import { topics, endpoints, sessionCredentials, consumerGroup, namespace } from '../helper';
describe('test/producer/Producer.test.ts', () => {
let producer: Producer | null = null;
let simpleConsumer: SimpleConsumer | null = null;
afterEach(async () => {
if (producer) {
await producer.shutdown();
producer = null;
}
if (simpleConsumer) {
await simpleConsumer.shutdown();
simpleConsumer = null;
}
});
describe('startup()', () => {
it('should startup success', async () => {
producer = new Producer({
endpoints,
namespace,
sessionCredentials,
maxAttempts: 2,
});
await producer.startup();
const sendReceipt = await producer.send({
topic: topics.normal,
tag: 'nodejs-unittest',
keys: [
`foo-key-${Date.now()}`,
`bar-key-${Date.now()}`,
],
body: Buffer.from(JSON.stringify({
hello: 'rocketmq-client-nodejs world 😄',
now: Date(),
})),
});
// console.log('sendReceipt: %o', sendReceipt);
assert(sendReceipt.offset >= 0);
assert.equal(typeof sendReceipt.messageId, 'string');
assert.equal(sendReceipt.messageId, sendReceipt.transactionId);
});
it('should startup fail when topic not exists', async () => {
await assert.rejects(async () => {
producer = new Producer({
topic: 'TopicTest-not-exists',
endpoints,
namespace,
sessionCredentials,
maxAttempts: 2,
});
await producer.startup();
}, (err: any) => {
assert.match(err.message, /Startup the rocketmq client failed, clientId=[^,]+, error=NotFoundException/);
assert.equal(err.cause instanceof NotFoundException, true);
assert.equal(err.cause.name, 'NotFoundException');
assert.equal(err.cause.code, 40402);
assert.match(err.cause.message, /CODE: 17 {2}DESC: No topic route info in name server for the topic: TopicTest-not-exists/);
return true;
});
});
});
describe('send()', () => {
it('should send normal message', async () => {
const topic = topics.normal;
const tag = `nodejs-unittest-tag-${randomUUID()}`;
producer = new Producer({
endpoints,
namespace,
sessionCredentials,
maxAttempts: 2,
});
await producer.startup();
const receipt = await producer.send({
topic,
tag,
keys: [
`foo-key-${Date.now()}`,
`bar-key-${Date.now()}`,
],
body: Buffer.from(JSON.stringify({
hello: 'rocketmq-client-nodejs world 😄',
now: Date(),
})),
});
assert(receipt.messageId);
simpleConsumer = new SimpleConsumer({
consumerGroup,
endpoints,
namespace,
sessionCredentials,
subscriptions: new Map().set(topic, tag),
awaitDuration: 3000,
});
await simpleConsumer.startup();
const messages = await simpleConsumer.receive(1, 10000);
assert.equal(messages.length, 1);
assert.equal(messages[0].messageId, receipt.messageId);
await simpleConsumer.ack(messages[0]);
});
it('should send delay message', async () => {
const topic = topics.delay;
const tag = `nodejs-unittest-tag-${randomUUID()}`;
producer = new Producer({
endpoints,
namespace,
sessionCredentials,
maxAttempts: 2,
});
await producer.startup();
const startTime = Date.now();
const receipt = await producer.send({
topic,
tag,
delay: 1000,
keys: [
`foo-key-${Date.now()}`,
`bar-key-${Date.now()}`,
],
body: Buffer.from(JSON.stringify({
hello: 'rocketmq-client-nodejs world 😄',
now: Date(),
})),
});
assert(receipt.messageId);
simpleConsumer = new SimpleConsumer({
consumerGroup,
endpoints,
namespace,
sessionCredentials,
subscriptions: new Map().set(topic, tag),
awaitDuration: 3000,
});
await simpleConsumer.startup();
const messages = await simpleConsumer.receive(1, 10000);
assert.equal(messages.length, 1);
const message = messages[0];
assert.equal(message.messageId, receipt.messageId);
assert(message.transportDeliveryTimestamp);
assert(message.transportDeliveryTimestamp.getTime() - startTime >= 1000);
await simpleConsumer.ack(message);
});
it('should send fifo message', async () => {
const topic = topics.fifo;
const tag = `nodejs-unittest-tag-${randomUUID()}`;
producer = new Producer({
endpoints,
namespace,
sessionCredentials,
maxAttempts: 2,
});
await producer.startup();
simpleConsumer = new SimpleConsumer({
consumerGroup,
endpoints,
namespace,
sessionCredentials,
subscriptions: new Map().set(topic, tag),
awaitDuration: 3000,
});
await simpleConsumer.startup();
// skip the first message
await producer.send({
topic,
tag,
body: Buffer.from(JSON.stringify({
hello: 'rocketmq-client-nodejs world 😄, first',
now: Date(),
})),
messageGroup: 'fifoMessageGroup',
});
await simpleConsumer.receive(1, 10000);
const receipt1 = await producer.send({
topic,
tag,
body: Buffer.from(JSON.stringify({
hello: 'rocketmq-client-nodejs world 😄, first',
now: Date(),
})),
messageGroup: 'fifoMessageGroup',
});
assert(receipt1.messageId);
const receipt2 = await producer.send({
topic,
tag,
body: Buffer.from(JSON.stringify({
hello: 'rocketmq-client-nodejs world 😄, second',
now: Date(),
})),
messageGroup: 'fifoMessageGroup',
});
assert(receipt2.messageId);
let messages = await simpleConsumer.receive(1, 10000);
assert.equal(messages.length, 1);
let message = messages[0];
assert.equal(JSON.parse(message.body.toString()).hello, 'rocketmq-client-nodejs world 😄, first');
assert.equal(message.messageId, receipt1.messageId);
assert(message.messageGroup);
assert.equal(message.messageGroup, 'fifoMessageGroup');
assert.equal(message.properties.get('__SHARDINGKEY'), 'fifoMessageGroup');
await simpleConsumer.ack(message);
messages = await simpleConsumer.receive(1, 10000);
assert.equal(messages.length, 1);
message = messages[0];
assert.equal(JSON.parse(message.body.toString()).hello, 'rocketmq-client-nodejs world 😄, second');
assert.equal(message.messageId, receipt2.messageId);
assert(message.messageGroup);
assert.equal(message.messageGroup, 'fifoMessageGroup');
assert.equal(message.properties.get('__SHARDINGKEY'), 'fifoMessageGroup');
await simpleConsumer.ack(message);
});
it('should send transaction message', async () => {
const topic = topics.transaction;
const tag = `nodejs-unittest-tag-${randomUUID()}`;
producer = new Producer({
endpoints,
namespace,
sessionCredentials,
maxAttempts: 2,
checker: {
async check(messageView) {
console.log(messageView);
return TransactionResolution.COMMIT;
},
},
});
await producer.startup();
const transaction = producer.beginTransaction();
const receipt = await producer.send({
topic,
tag,
keys: [
`foo-key-${Date.now()}`,
`bar-key-${Date.now()}`,
],
body: Buffer.from(JSON.stringify({
hello: 'rocketmq-client-nodejs world 😄',
now: Date(),
})),
}, transaction);
await transaction.commit();
simpleConsumer = new SimpleConsumer({
consumerGroup,
endpoints,
namespace,
sessionCredentials,
subscriptions: new Map().set(topic, tag),
awaitDuration: 3000,
});
await simpleConsumer.startup();
const messages = await simpleConsumer.receive(2, 10000);
assert.equal(messages.length, 1);
const message = messages[0];
assert.equal(message.messageId, receipt.messageId);
// console.log(message);
assert.equal(message.properties.get('__transactionId__'), receipt.transactionId);
await simpleConsumer.ack(message);
});
});
});