| 'use strict'; |
| |
| // FIXME: |
| // replace this Transform mess with a method that pipes input argument to output argument |
| |
| const MessageParser = require('./message-parser'); |
| const RelaxedBody = require('./relaxed-body'); |
| const sign = require('./sign'); |
| const PassThrough = require('stream').PassThrough; |
| const fs = require('fs'); |
| const path = require('path'); |
| const crypto = require('crypto'); |
| |
| const DKIM_ALGO = 'sha256'; |
| const MAX_MESSAGE_SIZE = 128 * 1024; // buffer messages larger than this to disk |
| |
| /* |
| // Usage: |
| |
| let dkim = new DKIM({ |
| domainName: 'example.com', |
| keySelector: 'key-selector', |
| privateKey, |
| cacheDir: '/tmp' |
| }); |
| dkim.sign(input).pipe(process.stdout); |
| |
| // Where inputStream is a rfc822 message (either a stream, string or Buffer) |
| // and outputStream is a DKIM signed rfc822 message |
| */ |
| |
| class DKIMSigner { |
| constructor(options, keys, input, output) { |
| this.options = options || {}; |
| this.keys = keys; |
| |
| this.cacheTreshold = Number(this.options.cacheTreshold) || MAX_MESSAGE_SIZE; |
| this.hashAlgo = this.options.hashAlgo || DKIM_ALGO; |
| |
| this.cacheDir = this.options.cacheDir || false; |
| |
| this.chunks = []; |
| this.chunklen = 0; |
| this.readPos = 0; |
| this.cachePath = this.cacheDir ? path.join(this.cacheDir, 'message.' + Date.now() + '-' + crypto.randomBytes(14).toString('hex')) : false; |
| this.cache = false; |
| |
| this.headers = false; |
| this.bodyHash = false; |
| this.parser = false; |
| this.relaxedBody = false; |
| |
| this.input = input; |
| this.output = output; |
| this.output.usingCache = false; |
| |
| this.errored = false; |
| |
| this.input.on('error', err => { |
| this.errored = true; |
| this.cleanup(); |
| output.emit('error', err); |
| }); |
| } |
| |
| cleanup() { |
| if (!this.cache || !this.cachePath) { |
| return; |
| } |
| fs.unlink(this.cachePath, () => false); |
| } |
| |
| createReadCache() { |
| // pipe remainings to cache file |
| this.cache = fs.createReadStream(this.cachePath); |
| this.cache.once('error', err => { |
| this.cleanup(); |
| this.output.emit('error', err); |
| }); |
| this.cache.once('close', () => { |
| this.cleanup(); |
| }); |
| this.cache.pipe(this.output); |
| } |
| |
| sendNextChunk() { |
| if (this.errored) { |
| return; |
| } |
| |
| if (this.readPos >= this.chunks.length) { |
| if (!this.cache) { |
| return this.output.end(); |
| } |
| return this.createReadCache(); |
| } |
| let chunk = this.chunks[this.readPos++]; |
| if (this.output.write(chunk) === false) { |
| return this.output.once('drain', () => { |
| this.sendNextChunk(); |
| }); |
| } |
| setImmediate(() => this.sendNextChunk()); |
| } |
| |
| sendSignedOutput() { |
| let keyPos = 0; |
| let signNextKey = () => { |
| if (keyPos >= this.keys.length) { |
| this.output.write(this.parser.rawHeaders); |
| return setImmediate(() => this.sendNextChunk()); |
| } |
| let key = this.keys[keyPos++]; |
| let dkimField = sign(this.headers, this.hashAlgo, this.bodyHash, { |
| domainName: key.domainName, |
| keySelector: key.keySelector, |
| privateKey: key.privateKey, |
| headerFieldNames: this.options.headerFieldNames, |
| skipFields: this.options.skipFields |
| }); |
| if (dkimField) { |
| this.output.write(Buffer.from(dkimField + '\r\n')); |
| } |
| return setImmediate(signNextKey); |
| }; |
| |
| if (this.bodyHash && this.headers) { |
| return signNextKey(); |
| } |
| |
| this.output.write(this.parser.rawHeaders); |
| this.sendNextChunk(); |
| } |
| |
| createWriteCache() { |
| this.output.usingCache = true; |
| // pipe remainings to cache file |
| this.cache = fs.createWriteStream(this.cachePath); |
| this.cache.once('error', err => { |
| this.cleanup(); |
| // drain input |
| this.relaxedBody.unpipe(this.cache); |
| this.relaxedBody.on('readable', () => { |
| while (this.relaxedBody.read() !== null) { |
| // do nothing |
| } |
| }); |
| this.errored = true; |
| // emit error |
| this.output.emit('error', err); |
| }); |
| this.cache.once('close', () => { |
| this.sendSignedOutput(); |
| }); |
| this.relaxedBody.pipe(this.cache); |
| } |
| |
| signStream() { |
| this.parser = new MessageParser(); |
| this.relaxedBody = new RelaxedBody({ |
| hashAlgo: this.hashAlgo |
| }); |
| |
| this.parser.on('headers', value => { |
| this.headers = value; |
| }); |
| |
| this.relaxedBody.on('hash', value => { |
| this.bodyHash = value; |
| }); |
| |
| this.relaxedBody.on('readable', () => { |
| let chunk; |
| if (this.cache) { |
| return; |
| } |
| while ((chunk = this.relaxedBody.read()) !== null) { |
| this.chunks.push(chunk); |
| this.chunklen += chunk.length; |
| if (this.chunklen >= this.cacheTreshold && this.cachePath) { |
| return this.createWriteCache(); |
| } |
| } |
| }); |
| |
| this.relaxedBody.on('end', () => { |
| if (this.cache) { |
| return; |
| } |
| this.sendSignedOutput(); |
| }); |
| |
| this.parser.pipe(this.relaxedBody); |
| setImmediate(() => this.input.pipe(this.parser)); |
| } |
| } |
| |
| class DKIM { |
| constructor(options) { |
| this.options = options || {}; |
| this.keys = [].concat(this.options.keys || { |
| domainName: options.domainName, |
| keySelector: options.keySelector, |
| privateKey: options.privateKey |
| }); |
| } |
| |
| sign(input, extraOptions) { |
| let output = new PassThrough(); |
| let inputStream = input; |
| let writeValue = false; |
| |
| if (Buffer.isBuffer(input)) { |
| writeValue = input; |
| inputStream = new PassThrough(); |
| } else if (typeof input === 'string') { |
| writeValue = Buffer.from(input); |
| inputStream = new PassThrough(); |
| } |
| |
| let options = this.options; |
| if (extraOptions && Object.keys(extraOptions).length) { |
| options = {}; |
| Object.keys(this.options || {}).forEach(key => { |
| options[key] = this.options[key]; |
| }); |
| Object.keys(extraOptions || {}).forEach(key => { |
| if (!(key in options)) { |
| options[key] = extraOptions[key]; |
| } |
| }); |
| } |
| |
| let signer = new DKIMSigner(options, this.keys, inputStream, output); |
| setImmediate(() => { |
| signer.signStream(); |
| if (writeValue) { |
| setImmediate(() => { |
| inputStream.end(writeValue); |
| }); |
| } |
| }); |
| |
| return output; |
| } |
| } |
| |
| module.exports = DKIM; |