tree: 4271c9b70b5696c7c55e508f0db282d303522d91
  1. examples/
  2. proto/
  3. scripts/
  4. src/
  5. test/
  6. .eslintrc
  7. .gitignore
  8. LICENSE
  9. package-lock.json
  10. package.json
  11. README-CN.md
  12. README.md
  13. tsconfig.json
  14. tsconfig.prod.json
nodejs/README.md

The Node.js Implementation of Apache RocketMQ Client

English | 简体中文 | RocketMQ Website

Overview

Here are some preparations you may need to know (or refer to quick start).

  1. Node.js 16.19.0 is the minimum version required, Node.js >= 18.17.0 is the recommended version.
  2. Setup namesrv, broker, and proxy.

Getting Started

We are using npm as the dependency management & publishing tool. You can find out more details about npm from its website. Here is the related command of npm you may use for development.

# Installs the project dependencies.
npm install
# Init grpc codes.
npm run init
# Run the unit tests.
npm test
# Installs rocketmq nodejs client
npm i rocketmq-client-nodejs

Enable trace debug log for grpc-js:

GRPC_TRACE=compression GRPC_VERBOSITY=debug GRPC_TRACE=all npm test

Publishing Steps

To publish a package to npm, please register an account in advance, then execute the following command.

# Builds a package and publishes it to the npm repository.
npm publish

Examples

Normal Message

Producer

import { Producer } from 'rocketmq-client-nodejs';

const producer = new Producer({
  endpoints: '127.0.0.1:8081',
});
await producer.startup();

const receipt = await producer.send({
  topic: 'TopicTest',
  tag: 'nodejs-demo',
  body: Buffer.from(JSON.stringify({
    hello: 'rocketmq-client-nodejs world 😄',
    now: Date(),
  })),
});
console.log(receipt);

Delay Message with Recall

Send and recall a delayed message:

import { Producer } from 'rocketmq-client-nodejs';

const producer = new Producer({
  endpoints: '127.0.0.1:8081',
});
await producer.startup();

// Send a delay message (will be delivered after 10 seconds)
const receipt = await producer.send({
  topic: 'DelayTopic',
  tag: 'delay-recall',
  delay: 10000, // 10 seconds delay
  body: Buffer.from('This is a delayed message'),
});

console.log('Message sent:', {
  messageId: receipt.messageId,
  recallHandle: receipt.recallHandle, // Handle for recalling the message
});

// Recall the message before it's delivered (within 10 seconds)
try {
  const recallReceipt = await producer.recallMessage(
    'DelayTopic',
    receipt.recallHandle
  );
  console.log('Message recalled successfully:', recallReceipt.messageId);
} catch (error) {
  console.error('Failed to recall message:', error);
}

await producer.shutdown();

SimpleConsumer

import { SimpleConsumer } from 'rocketmq-client-nodejs';

const simpleConsumer = new SimpleConsumer({
  consumerGroup: 'nodejs-demo-group',
  endpoints: '127.0.0.1:8081',
  subscriptions: new Map().set('TopicTest', 'nodejs-demo'),
});
await simpleConsumer.startup();

const messages = await simpleConsumer.receive(20);
console.log('got %d messages', messages.length);
for (const message of messages) {
  console.log(message);
  console.log('body=%o', message.body.toString());
  await simpleConsumer.ack(message);
}

Push Consumer

PushConsumer actively pulls messages from the server and pushes them to the listener for processing:

import { PushConsumer, ConsumeResult, type MessageView } from 'rocketmq-client-nodejs';

// Create PushConsumer instance
const pushConsumer = new PushConsumer({
  namespace: '', // Namespace, can be empty string
  endpoints: '127.0.0.1:8081',
  consumerGroup: 'yourConsumerGroup',
  
  // Subscribe to topic and TAG
  subscriptions: new Map([
    ['yourTopic', '*'],  // Subscribe to yourTopic, receive all TAGs
  ]),
  
  // Message listener - core processing logic
  messageListener: {
    async consume(messageView: MessageView): Promise<ConsumeResult> {
      console.log('Received message:', messageView.body.toString('utf-8'));
      
      // TODO: Process your business logic here
      
      return ConsumeResult.SUCCESS; // Return SUCCESS after successful processing
    },
  },
});

try {
  // Start consumer
  await pushConsumer.startup();
  console.log('PushConsumer started, waiting for messages...');
  
  // Keep running, waiting for messages
  await new Promise(() => {});
} catch (error) {
  console.error('Error:', error);
  await pushConsumer.shutdown();
  throw error;
}

Current Progress

Message Type

  • [x] NORMAL
  • [x] FIFO
  • [x] DELAY
  • [x] TRANSACTION

Client Type

  • [x] PRODUCER
  • [x] SIMPLE_CONSUMER
  • [x] PUSH_CONSUMER
  • [ ] PULL_CONSUMER