English | 简体中文 | RocketMQ Website
Here are some preparations you may need to know (or refer to quick start).
We are using npm as the dependency management & publishing tool. You can find out more details about npm from its website. Here is the related command of npm you may use for development.
# Installs the project dependencies. npm install # Init grpc codes. npm run init # Run the unit tests. npm test # Installs rocketmq nodejs client npm i rocketmq-client-nodejs
Enable trace debug log for grpc-js:
GRPC_TRACE=compression GRPC_VERBOSITY=debug GRPC_TRACE=all npm test
To publish a package to npm, please register an account in advance, then execute the following command.
# Builds a package and publishes it to the npm repository. npm publish
Producer
import { Producer } from 'rocketmq-client-nodejs'; const producer = new Producer({ endpoints: '127.0.0.1:8081', }); await producer.startup(); const receipt = await producer.send({ topic: 'TopicTest', tag: 'nodejs-demo', body: Buffer.from(JSON.stringify({ hello: 'rocketmq-client-nodejs world 😄', now: Date(), })), }); console.log(receipt);
Send and recall a delayed message:
import { Producer } from 'rocketmq-client-nodejs'; const producer = new Producer({ endpoints: '127.0.0.1:8081', }); await producer.startup(); // Send a delay message (will be delivered after 10 seconds) const receipt = await producer.send({ topic: 'DelayTopic', tag: 'delay-recall', delay: 10000, // 10 seconds delay body: Buffer.from('This is a delayed message'), }); console.log('Message sent:', { messageId: receipt.messageId, recallHandle: receipt.recallHandle, // Handle for recalling the message }); // Recall the message before it's delivered (within 10 seconds) try { const recallReceipt = await producer.recallMessage( 'DelayTopic', receipt.recallHandle ); console.log('Message recalled successfully:', recallReceipt.messageId); } catch (error) { console.error('Failed to recall message:', error); } await producer.shutdown();
SimpleConsumer
import { SimpleConsumer } from 'rocketmq-client-nodejs'; const simpleConsumer = new SimpleConsumer({ consumerGroup: 'nodejs-demo-group', endpoints: '127.0.0.1:8081', subscriptions: new Map().set('TopicTest', 'nodejs-demo'), }); await simpleConsumer.startup(); const messages = await simpleConsumer.receive(20); console.log('got %d messages', messages.length); for (const message of messages) { console.log(message); console.log('body=%o', message.body.toString()); await simpleConsumer.ack(message); }
PushConsumer actively pulls messages from the server and pushes them to the listener for processing:
import { PushConsumer, ConsumeResult, type MessageView } from 'rocketmq-client-nodejs'; // Create PushConsumer instance const pushConsumer = new PushConsumer({ namespace: '', // Namespace, can be empty string endpoints: '127.0.0.1:8081', consumerGroup: 'yourConsumerGroup', // Subscribe to topic and TAG subscriptions: new Map([ ['yourTopic', '*'], // Subscribe to yourTopic, receive all TAGs ]), // Message listener - core processing logic messageListener: { async consume(messageView: MessageView): Promise<ConsumeResult> { console.log('Received message:', messageView.body.toString('utf-8')); // TODO: Process your business logic here return ConsumeResult.SUCCESS; // Return SUCCESS after successful processing }, }, }); try { // Start consumer await pushConsumer.startup(); console.log('PushConsumer started, waiting for messages...'); // Keep running, waiting for messages await new Promise(() => {}); } catch (error) { console.error('Error:', error); await pushConsumer.shutdown(); throw error; }