Client configuration add namespace (#750)
* clientConfiguration add namespace
* update beta version
* add namespace check
* add namespace
* add namespace
* update version
* Remove empty values from settings
* update format and single quotes
* update format
* update version
* remove codecov
* Change namespace to required
* Change namespace to required
* Change namespace to required
* test adds default parameters
---------
Co-authored-by: zh378814 <wb-zh378814@alibaba-inc.com>
diff --git a/.github/workflows/nodejs_coverage.yml b/.github/workflows/nodejs_coverage.yml
deleted file mode 100644
index 2a29a5b..0000000
--- a/.github/workflows/nodejs_coverage.yml
+++ /dev/null
@@ -1,46 +0,0 @@
-name: Node.js Coverage
-on:
- pull_request:
- types: [opened, reopened, synchronize]
- paths:
- - 'nodejs/**'
- push:
- branches:
- - master
-
-jobs:
- build:
- runs-on: ubuntu-latest
- strategy:
- matrix:
- node-version: [16.19.0, 16.x, 18.x, 20.x]
- steps:
- - name: Checkout Git Source
- uses: actions/checkout@v3
- with:
- submodules: recursive
-
- - name: Use Node.js ${{ matrix.node-version }}
- uses: actions/setup-node@v3
- with:
- node-version: ${{ matrix.node-version }}
-
- - name: Install dependencies
- working-directory: ./nodejs
- run: npm i && npm run init
-
- - name: Start RocketMQ Server
- working-directory: ./nodejs
- run: npm run start-rocketmq
-
- - name: Run test
- working-directory: ./nodejs
- run: npm run ci
-
- - name: Code Coverage
- uses: codecov/codecov-action@v3
- with:
- files: ./nodejs/coverage/coverage-final.json
- flags: nodejs
- fail_ci_if_error: true
- verbose: true
diff --git a/nodejs/examples/Producer.ts b/nodejs/examples/Producer.ts
index c90c761..6b8be4c 100644
--- a/nodejs/examples/Producer.ts
+++ b/nodejs/examples/Producer.ts
@@ -19,6 +19,7 @@
const producer = new Producer({
endpoints: '127.0.0.1:8081',
+ namespace: ''
});
await producer.startup();
diff --git a/nodejs/examples/SimpleConsumer.ts b/nodejs/examples/SimpleConsumer.ts
index c58d9d1..fb36c38 100644
--- a/nodejs/examples/SimpleConsumer.ts
+++ b/nodejs/examples/SimpleConsumer.ts
@@ -20,6 +20,7 @@
const simpleConsumer = new SimpleConsumer({
consumerGroup: 'nodejs-demo-group',
endpoints: '127.0.0.1:8081',
+ namespace: '',
subscriptions: new Map().set('TopicTest', 'nodejs-demo'),
});
await simpleConsumer.startup();
diff --git a/nodejs/src/client/BaseClient.ts b/nodejs/src/client/BaseClient.ts
index 9fa12da..77f790b 100644
--- a/nodejs/src/client/BaseClient.ts
+++ b/nodejs/src/client/BaseClient.ts
@@ -57,6 +57,7 @@
* - example.com:8443
*/
endpoints: string;
+ namespace: string;
sessionCredentials?: SessionCredentials;
requestTimeout?: number;
logger?: ILogger;
@@ -76,6 +77,7 @@
readonly clientType = ClientType.CLIENT_TYPE_UNSPECIFIED;
readonly sslEnabled: boolean;
readonly #sessionCredentials?: SessionCredentials;
+ readonly namespace: string;
protected readonly endpoints: Endpoints;
protected readonly isolated = new Map<string, Endpoints>();
protected readonly requestTimeout: number;
@@ -92,6 +94,7 @@
this.logger = options.logger ?? getDefaultLogger();
this.sslEnabled = options.sslEnabled === true;
this.endpoints = new Endpoints(options.endpoints);
+ this.namespace = options.namespace;
this.#sessionCredentials = options.sessionCredentials;
// https://rocketmq.apache.org/docs/introduction/03limits/
// Default request timeout is 3000ms
@@ -288,6 +291,9 @@
metadata.set('x-mq-language', 'HTTP');
// version of client
metadata.set('x-mq-client-version', UserAgent.INSTANCE.version);
+ if (this.namespace) {
+ metadata.set('x-mq-namespace', this.namespace);
+ }
if (this.#sessionCredentials) {
if (this.#sessionCredentials.securityToken) {
metadata.set('x-mq-session-token', this.#sessionCredentials.securityToken);
diff --git a/nodejs/src/client/Settings.ts b/nodejs/src/client/Settings.ts
index 34f7eeb..d67b51a 100644
--- a/nodejs/src/client/Settings.ts
+++ b/nodejs/src/client/Settings.ts
@@ -20,15 +20,16 @@
import { RetryPolicy } from '../retry';
export abstract class Settings {
+ protected readonly namespace: string;
protected readonly clientId: string;
protected readonly clientType: ClientType;
protected readonly accessPoint: Endpoints;
protected retryPolicy?: RetryPolicy;
protected readonly requestTimeout: number;
- constructor(clientId: string, clientType: ClientType, accessPoint: Endpoints, requestTimeout: number,
- retryPolicy?: RetryPolicy) {
+ constructor(namespace: string, clientId: string, clientType: ClientType, accessPoint: Endpoints, requestTimeout: number, retryPolicy?: RetryPolicy) {
this.clientId = clientId;
+ this.namespace = namespace;
this.clientType = clientType;
this.accessPoint = accessPoint;
this.retryPolicy = retryPolicy;
diff --git a/nodejs/src/consumer/SimpleConsumer.ts b/nodejs/src/consumer/SimpleConsumer.ts
index 916f362..b5b045a 100644
--- a/nodejs/src/consumer/SimpleConsumer.ts
+++ b/nodejs/src/consumer/SimpleConsumer.ts
@@ -60,7 +60,7 @@
}
}
this.#awaitDuration = options.awaitDuration ?? 30000;
- this.#simpleSubscriptionSettings = new SimpleSubscriptionSettings(this.clientId, this.endpoints,
+ this.#simpleSubscriptionSettings = new SimpleSubscriptionSettings(options.namespace, this.clientId, this.endpoints,
this.consumerGroup, this.requestTimeout, this.#awaitDuration, this.#subscriptionExpressions);
}
diff --git a/nodejs/src/consumer/SimpleSubscriptionSettings.ts b/nodejs/src/consumer/SimpleSubscriptionSettings.ts
index 116a613..f12c4e2 100644
--- a/nodejs/src/consumer/SimpleSubscriptionSettings.ts
+++ b/nodejs/src/consumer/SimpleSubscriptionSettings.ts
@@ -30,9 +30,8 @@
readonly group: string;
readonly subscriptionExpressions: Map<string, FilterExpression>;
- constructor(clientId: string, accessPoint: Endpoints, group: string,
- requestTimeout: number, longPollingTimeout: number, subscriptionExpressions: Map<string, FilterExpression>) {
- super(clientId, ClientType.SIMPLE_CONSUMER, accessPoint, requestTimeout);
+ constructor(namespace: string, clientId: string, accessPoint: Endpoints, group: string, requestTimeout: number, longPollingTimeout: number, subscriptionExpressions: Map<string, FilterExpression>) {
+ super(namespace, clientId, ClientType.SIMPLE_CONSUMER, accessPoint, requestTimeout);
this.longPollingTimeout = longPollingTimeout;
this.group = group;
this.subscriptionExpressions = subscriptionExpressions;
diff --git a/nodejs/src/message/PublishingMessage.ts b/nodejs/src/message/PublishingMessage.ts
index b6015bf..959d2f2 100644
--- a/nodejs/src/message/PublishingMessage.ts
+++ b/nodejs/src/message/PublishingMessage.ts
@@ -68,7 +68,7 @@
* This method should be invoked before each message sending, because the born time is reset before each
* invocation, which means that it should not be invoked ahead of time.
*/
- toProtobuf(mq: MessageQueue) {
+ toProtobuf(namespace: string, mq: MessageQueue) {
const systemProperties = new SystemProperties()
.setKeysList(this.keys)
.setMessageId(this.messageId)
@@ -87,8 +87,10 @@
systemProperties.setMessageGroup(this.messageGroup);
}
+ const resource = createResource(this.topic);
+ resource.setResourceNamespace(namespace);
const message = new MessagePB()
- .setTopic(createResource(this.topic))
+ .setTopic(resource)
.setBody(this.body)
.setSystemProperties(systemProperties);
if (this.properties) {
diff --git a/nodejs/src/producer/Producer.ts b/nodejs/src/producer/Producer.ts
index 623ecf4..8484167 100644
--- a/nodejs/src/producer/Producer.ts
+++ b/nodejs/src/producer/Producer.ts
@@ -66,7 +66,7 @@
// https://rocketmq.apache.org/docs/introduction/03limits/
// Default max number of message sending retries is 3
const retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(options.maxAttempts ?? 3);
- this.#publishingSettings = new PublishingSettings(this.clientId, this.endpoints, retryPolicy,
+ this.#publishingSettings = new PublishingSettings(options.namespace, this.clientId, this.endpoints, retryPolicy,
this.requestTimeout, this.topics);
this.#checker = options.checker;
}
@@ -85,7 +85,7 @@
const request = new EndTransactionRequest()
.setMessageId(messageId)
.setTransactionId(transactionId)
- .setTopic(createResource(message.topic))
+ .setTopic(createResource(message.topic).setResourceNamespace(this.namespace))
.setResolution(resolution);
const response = await this.rpcClientManager.endTransaction(endpoints, request, this.requestTimeout);
StatusChecker.check(response.getStatus()?.toObject());
@@ -187,7 +187,11 @@
#wrapSendMessageRequest(pubMessages: PublishingMessage[], mq: MessageQueue) {
const request = new SendMessageRequest();
for (const pubMessage of pubMessages) {
- request.addMessages(pubMessage.toProtobuf(mq));
+ if (this.namespace) {
+ request.addMessages(pubMessage.toProtobuf(this.namespace, mq));
+ } else {
+ request.addMessages(pubMessage.toProtobuf('', mq));
+ }
}
return request;
}
diff --git a/nodejs/src/producer/PublishingSettings.ts b/nodejs/src/producer/PublishingSettings.ts
index fb72d66..8b41380 100644
--- a/nodejs/src/producer/PublishingSettings.ts
+++ b/nodejs/src/producer/PublishingSettings.ts
@@ -35,9 +35,8 @@
#maxBodySizeBytes = 4 * 1024 * 1024;
#validateMessageType = true;
- constructor(clientId: string, accessPoint: Endpoints, retryPolicy: ExponentialBackoffRetryPolicy,
- requestTimeout: number, topics: Set<string>) {
- super(clientId, ClientType.PRODUCER, accessPoint, requestTimeout, retryPolicy);
+ constructor(namespace: string, clientId: string, accessPoint: Endpoints, retryPolicy: ExponentialBackoffRetryPolicy, requestTimeout: number, topics: Set<string>) {
+ super(namespace, clientId, ClientType.PRODUCER, accessPoint, requestTimeout, retryPolicy);
this.#topics = topics;
}
@@ -54,6 +53,7 @@
.setValidateMessageType(this.#validateMessageType);
for (const topic of this.#topics) {
publishing.addTopics().setName(topic);
+ publishing.addTopics().setResourceNamespace(this.namespace);
}
return new SettingsPB()
.setClientType(this.clientType)
diff --git a/nodejs/test/consumer/SimpleConsumer.test.ts b/nodejs/test/consumer/SimpleConsumer.test.ts
index 8e375a9..3962a2e 100644
--- a/nodejs/test/consumer/SimpleConsumer.test.ts
+++ b/nodejs/test/consumer/SimpleConsumer.test.ts
@@ -21,7 +21,7 @@
SimpleConsumer, FilterExpression,
Producer,
} from '../../src';
-import { topics, endpoints, sessionCredentials } from '../helper';
+import { topics, endpoints, sessionCredentials, namespace } from '../helper';
describe('test/consumer/SimpleConsumer.test.ts', () => {
let producer: Producer | null = null;
@@ -42,6 +42,7 @@
if (!sessionCredentials) return;
simpleConsumer = new SimpleConsumer({
endpoints,
+ namespace,
sessionCredentials,
consumerGroup: 'nodejs-unittest-group',
subscriptions: new Map().set(topics.delay, FilterExpression.SUB_ALL),
@@ -53,6 +54,7 @@
if (!sessionCredentials) return;
simpleConsumer = new SimpleConsumer({
endpoints,
+ namespace,
sessionCredentials: {
...sessionCredentials,
accessKey: 'wrong',
@@ -69,6 +71,7 @@
if (!sessionCredentials) return;
simpleConsumer = new SimpleConsumer({
endpoints,
+ namespace,
sessionCredentials: {
...sessionCredentials,
accessSecret: 'wrong',
@@ -88,11 +91,13 @@
const tag = `nodejs-unittest-tag-${randomUUID()}`;
producer = new Producer({
endpoints,
- sessionCredentials,
+ namespace,
+ sessionCredentials
});
await producer.startup();
simpleConsumer = new SimpleConsumer({
endpoints,
+ namespace,
sessionCredentials,
consumerGroup: `nodejs-unittest-group-${randomUUID()}`,
subscriptions: new Map().set(topic, new FilterExpression(tag)),
diff --git a/nodejs/test/helper.ts b/nodejs/test/helper.ts
index 4a3deec..4cd23fa 100644
--- a/nodejs/test/helper.ts
+++ b/nodejs/test/helper.ts
@@ -18,6 +18,7 @@
import { SessionCredentials } from '../src/client';
export const endpoints = process.env.ROCKETMQ_NODEJS_CLIENT_ENDPOINTS ?? 'localhost:8081';
+export const namespace = process.env.ROCKETMQ_NODEJS_CLIENT_NAMESPACE ?? '';
export const topics = {
normal: 'TopicTestForNormal',
fifo: 'TopicTestForFifo',
diff --git a/nodejs/test/producer/Producer.test.ts b/nodejs/test/producer/Producer.test.ts
index 005cd3b..66deec7 100644
--- a/nodejs/test/producer/Producer.test.ts
+++ b/nodejs/test/producer/Producer.test.ts
@@ -19,7 +19,7 @@
import { randomUUID } from 'node:crypto';
import { NotFoundException, Producer, SimpleConsumer } from '../../src';
import { TransactionResolution } from '../../proto/apache/rocketmq/v2/definition_pb';
-import { topics, endpoints, sessionCredentials, consumerGroup } from '../helper';
+import { topics, endpoints, sessionCredentials, consumerGroup, namespace } from '../helper';
describe('test/producer/Producer.test.ts', () => {
let producer: Producer | null = null;
@@ -39,6 +39,7 @@
it('should startup success', async () => {
producer = new Producer({
endpoints,
+ namespace,
sessionCredentials,
maxAttempts: 2,
});
@@ -66,6 +67,7 @@
producer = new Producer({
topic: 'TopicTest-not-exists',
endpoints,
+ namespace,
sessionCredentials,
maxAttempts: 2,
});
@@ -87,6 +89,7 @@
const tag = `nodejs-unittest-tag-${randomUUID()}`;
producer = new Producer({
endpoints,
+ namespace,
sessionCredentials,
maxAttempts: 2,
});
@@ -108,6 +111,7 @@
simpleConsumer = new SimpleConsumer({
consumerGroup,
endpoints,
+ namespace,
sessionCredentials,
subscriptions: new Map().set(topic, tag),
awaitDuration: 3000,
@@ -124,6 +128,7 @@
const tag = `nodejs-unittest-tag-${randomUUID()}`;
producer = new Producer({
endpoints,
+ namespace,
sessionCredentials,
maxAttempts: 2,
});
@@ -147,6 +152,7 @@
simpleConsumer = new SimpleConsumer({
consumerGroup,
endpoints,
+ namespace,
sessionCredentials,
subscriptions: new Map().set(topic, tag),
awaitDuration: 3000,
@@ -166,6 +172,7 @@
const tag = `nodejs-unittest-tag-${randomUUID()}`;
producer = new Producer({
endpoints,
+ namespace,
sessionCredentials,
maxAttempts: 2,
});
@@ -173,6 +180,7 @@
simpleConsumer = new SimpleConsumer({
consumerGroup,
endpoints,
+ namespace,
sessionCredentials,
subscriptions: new Map().set(topic, tag),
awaitDuration: 3000,
@@ -238,6 +246,7 @@
const tag = `nodejs-unittest-tag-${randomUUID()}`;
producer = new Producer({
endpoints,
+ namespace,
sessionCredentials,
maxAttempts: 2,
checker: {
@@ -266,6 +275,7 @@
simpleConsumer = new SimpleConsumer({
consumerGroup,
endpoints,
+ namespace,
sessionCredentials,
subscriptions: new Map().set(topic, tag),
awaitDuration: 3000,