| var Stream = require('stream').Stream, |
| util = require('util'), |
| driver = require('websocket-driver'), |
| Headers = require('websocket-driver/lib/websocket/driver/headers'), |
| API = require('./websocket/api'), |
| EventTarget = require('./websocket/api/event_target'), |
| Event = require('./websocket/api/event'); |
| |
| var EventSource = function(request, response, options) { |
| this.writable = true; |
| options = options || {}; |
| |
| this._stream = response.socket; |
| this._ping = options.ping || this.DEFAULT_PING; |
| this._retry = options.retry || this.DEFAULT_RETRY; |
| |
| var scheme = driver.isSecureRequest(request) ? 'https:' : 'http:'; |
| this.url = scheme + '//' + request.headers.host + request.url; |
| this.lastEventId = request.headers['last-event-id'] || ''; |
| this.readyState = API.CONNECTING; |
| |
| var headers = new Headers(), |
| self = this; |
| |
| if (options.headers) { |
| for (var key in options.headers) headers.set(key, options.headers[key]); |
| } |
| |
| if (!this._stream || !this._stream.writable) return; |
| process.nextTick(function() { self._open() }); |
| |
| this._stream.setTimeout(0); |
| this._stream.setNoDelay(true); |
| |
| var handshake = 'HTTP/1.1 200 OK\r\n' + |
| 'Content-Type: text/event-stream\r\n' + |
| 'Cache-Control: no-cache, no-store\r\n' + |
| 'Connection: close\r\n' + |
| headers.toString() + |
| '\r\n' + |
| 'retry: ' + Math.floor(this._retry * 1000) + '\r\n\r\n'; |
| |
| this._write(handshake); |
| |
| this._stream.on('drain', function() { self.emit('drain') }); |
| |
| if (this._ping) |
| this._pingTimer = setInterval(function() { self.ping() }, this._ping * 1000); |
| |
| ['error', 'end'].forEach(function(event) { |
| self._stream.on(event, function() { self.close() }); |
| }); |
| }; |
| util.inherits(EventSource, Stream); |
| |
| EventSource.isEventSource = function(request) { |
| if (request.method !== 'GET') return false; |
| var accept = (request.headers.accept || '').split(/\s*,\s*/); |
| return accept.indexOf('text/event-stream') >= 0; |
| }; |
| |
| var instance = { |
| DEFAULT_PING: 10, |
| DEFAULT_RETRY: 5, |
| |
| _write: function(chunk) { |
| if (!this.writable) return false; |
| try { |
| return this._stream.write(chunk, 'utf8'); |
| } catch (e) { |
| return false; |
| } |
| }, |
| |
| _open: function() { |
| if (this.readyState !== API.CONNECTING) return; |
| |
| this.readyState = API.OPEN; |
| |
| var event = new Event('open'); |
| event.initEvent('open', false, false); |
| this.dispatchEvent(event); |
| }, |
| |
| write: function(message) { |
| return this.send(message); |
| }, |
| |
| end: function(message) { |
| if (message !== undefined) this.write(message); |
| this.close(); |
| }, |
| |
| send: function(message, options) { |
| if (this.readyState > API.OPEN) return false; |
| |
| message = String(message).replace(/(\r\n|\r|\n)/g, '$1data: '); |
| options = options || {}; |
| |
| var frame = ''; |
| if (options.event) frame += 'event: ' + options.event + '\r\n'; |
| if (options.id) frame += 'id: ' + options.id + '\r\n'; |
| frame += 'data: ' + message + '\r\n\r\n'; |
| |
| return this._write(frame); |
| }, |
| |
| ping: function() { |
| return this._write(':\r\n\r\n'); |
| }, |
| |
| close: function() { |
| if (this.readyState > API.OPEN) return false; |
| |
| this.readyState = API.CLOSED; |
| this.writable = false; |
| if (this._pingTimer) clearInterval(this._pingTimer); |
| if (this._stream) this._stream.end(); |
| |
| var event = new Event('close'); |
| event.initEvent('close', false, false); |
| this.dispatchEvent(event); |
| |
| return true; |
| } |
| }; |
| |
| for (var method in instance) EventSource.prototype[method] = instance[method]; |
| for (var key in EventTarget) EventSource.prototype[key] = EventTarget[key]; |
| |
| module.exports = EventSource; |