blob: 717eb4537b414d3c1652966d61ffc1c417747ba7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
const commander = require('commander');
const delay = require('delay');
const {
performance,
} = require('perf_hooks');
const Pulsar = require('../index.js');
// Parse args
(() => {
commander
.option('-s, --subscription [subscription]', 'Subscription')
.option('-u, --url [url]', 'Pulsar Service URL')
.option('-t, --topic [topic]', 'Topic Name')
.option('-m, --messages [messages]', 'Number of Messages', 1000)
.option('-i, --iteration [iteration]', 'Iteration of Measure', 3, parseInt)
.on('--help', () => {
console.log('');
console.log('Examples:');
console.log(' $ node perf/perf_consumer.js --subscription sub1 --url pulsar://localhost:6650 --topic persistent://public/default/my-topic');
})
.parse(process.argv);
if (typeof commander.subscription === 'undefined') {
console.error('no subscription given!');
process.exit(1);
}
if (typeof commander.url === 'undefined') {
console.error('no URL given!');
process.exit(1);
}
if (typeof commander.topic === 'undefined') {
console.error('no topic name given!');
process.exit(1);
}
console.log('----------------------');
commander.options.forEach((option) => {
const optionName = (option.long).replace('--', '');
console.log(`${optionName}: ${commander[optionName]}`);
});
console.log('----------------------');
})();
(async () => {
// Create a client
const clientConfig = {
serviceUrl: commander.url,
};
const client = new Pulsar.Client(clientConfig);
// Create a consumer
const consumerConfig = {
topic: commander.topic,
subscription: commander.subscription,
};
const consumer = await client.subscribe(consumerConfig);
const numOfMessages = parseInt(commander.messages, 10);
for (let i = 0; i < commander.iteration; i += 1) {
// measure
await delay(1000);
const startTimeMilliSeconds = performance.now();
const results = [];
for (let mi = 0; mi < numOfMessages; mi += 1) {
results.push(new Promise((resolve) => {
consumer.receive().then((msg) => {
consumer.acknowledge(msg);
resolve(msg.getData().length);
});
}));
}
const messageSizes = await Promise.all(results);
const endTimeMilliSeconds = performance.now();
// result
const durationSeconds = (endTimeMilliSeconds - startTimeMilliSeconds) / 1000.0;
const rate = numOfMessages / durationSeconds;
const totalMessageSize = messageSizes.reduce((a, b) => a + b);
const throuhputMbit = (totalMessageSize / 1024 / 1024 * 8) / durationSeconds;
console.log('Throughput received: %f msg/s --- %f Mbit/s', rate.toFixed(3), throuhputMbit.toFixed(3));
}
await consumer.close();
await client.close();
})();