blob: e174727c4db2e008a0c1a9dd251876a88e220fea [file] [log] [blame]
'use strict';
const amqplib = require('amqplib');
function rabbitmqAppender(config, layout) {
const host = config.host || '127.0.0.1';
const port = config.port || 5672;
const username = config.username || 'guest';
const password = config.password || 'guest';
const exchange = config.exchange || '';
const type = config.mq_type || '';
const durable = config.durable || false;
const routingKey = config.routing_key || 'logstash';
const con = {
protocol: 'amqp',
hostname: host,
port: port,
username: username,
password: password,
locale: 'en_US',
frameMax: 0,
heartbeat: 0,
vhost: '/',
routing_key: routingKey,
exchange: exchange,
mq_type: type,
durable: durable,
};
const clientconn = amqplib.connect(con);
clientconn.publish = amqplib.connect(con).publish ? amqplib.connect(con).publish : (client, message) => {
client.then((conn) => {
const rn = conn.createChannel().then((ch) => {
const ok = ch.assertExchange(exchange, type, { durable: durable });
return ok.then(() => {
ch.publish(exchange, routingKey, Buffer.from(message));
return ch.close();
});
});
return rn;
}).catch(console.error);
};
function log(loggingEvent) {
const message = layout(loggingEvent);
clientconn.publish(clientconn, message);
}
log.shutdown = function () {
clientconn.close();
};
return log;
}
function configure(config, layouts) {
let layout = layouts.messagePassThroughLayout;
if (config.layout) {
layout = layouts.layout(config.layout.type, config.layout);
}
return rabbitmqAppender(config, layout);
}
module.exports.configure = configure;