| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| "use strict"; |
| |
| require("./env_init"); |
| |
| const assert = require("assert"); |
| const EventEmitter = require("events").EventEmitter; |
| |
| const common = require("./common"); |
| |
| const binding = common.requireBinding("rocketmq"); |
| |
| const START_OR_SHUTDOWN = Symbol("RocketMQPushConsumer#startOrShutdown"); |
| const START_STATUS = { |
| STOPPED: 0, |
| STARTED: 1, |
| STOPPING: 2, |
| STARTING: 3 |
| }; |
| const OPTIONS_LOG_LEVEL = { |
| FATAL: 1, |
| ERROR: 2, |
| WARN: 3, |
| INFO: 4, |
| DEBUG: 5, |
| TRACE: 6, |
| NUM: 7 |
| }; |
| |
| const DEFAULT_OPTIONS = {}; |
| |
| let producerRef = 0; |
| let timer; |
| |
| class RocketMQPushConsumer extends EventEmitter { |
| /** |
| * RocketMQ PushConsumer constructor |
| * @param {String} groupId the group id |
| * @param {String} [instanceName] the instance name |
| * @param {Object} options the options |
| */ |
| constructor(groupId, instanceName, options) { |
| super(); |
| |
| if(typeof instanceName !== "string" && !options) { |
| options = instanceName; |
| instanceName = null; |
| } |
| |
| options = Object.assign({}, DEFAULT_OPTIONS, options || {}); |
| if(options.logLevel && typeof options.logLevel === "string") { |
| options.logLevel = OPTIONS_LOG_LEVEL[options.logLevel.toUpperCase()] || OPTIONS_LOG_LEVEL.INFO; |
| } |
| this.core = new binding.PushConsumer(groupId, instanceName, options); |
| this.core.setListener(this.emit.bind(this, "message")); |
| this.status = START_STATUS.STOPPED; |
| } |
| |
| [START_OR_SHUTDOWN](method, callback) { |
| let promise; |
| let resolve; |
| let reject; |
| if(!callback) { |
| promise = new Promise((_resolve, _reject) => { |
| resolve = _resolve; |
| reject = _reject; |
| }); |
| } else { |
| resolve = reject = callback; |
| } |
| |
| this.core[method]((err, ret) => { |
| if(err) return reject(err); |
| |
| if(method === "start") { |
| this.status = START_STATUS.STARTED; |
| if(!producerRef) timer = setInterval(function() {}, 24 * 3600 * 1000); |
| producerRef++; |
| } else { |
| this.status = START_STATUS.STOPPED; |
| producerRef--; |
| if(!producerRef) clearInterval(timer); |
| } |
| |
| return callback ? resolve(undefined, ret) : resolve(ret); |
| }); |
| |
| if(!callback) return promise; |
| } |
| |
| /** |
| * Start the push consumer |
| * @param {Function} [callback] the callback function |
| * @return {Promise|undefined} returns a Promise if no callback |
| */ |
| start(callback) { |
| assert(this.status === START_STATUS.STOPPED); |
| this.status = START_STATUS.STARTING; |
| return this[START_OR_SHUTDOWN]("start", callback); |
| } |
| |
| /** |
| * Shutdown the push consumer |
| * @param {Function} [callback] the callback function |
| * @return {Promise|undefined} returns a Promise if no callback |
| */ |
| shutdown(callback) { |
| assert(this.status === START_STATUS.STARTED); |
| this.status = START_STATUS.STOPPING; |
| return this[START_OR_SHUTDOWN]("shutdown", callback); |
| } |
| |
| /** |
| * subscribe a topic |
| * @param {String} topic the topic to be subscribed |
| * @param {String} [expression] the additional expression to be subscribed |
| * @return {Number} the subscribe status result |
| */ |
| subscribe(topic, expression) { |
| assert(this.status === START_STATUS.STOPPED); |
| assert(topic && typeof topic === "string"); |
| assert(!expression || expression && typeof expression === "string"); |
| return this.core.subscribe(topic, expression || ""); |
| } |
| } |
| |
| module.exports = RocketMQPushConsumer; |