| import jsExtend from 'js-extend'; var extend = jsExtend.extend; |
| import Promise from 'pouchdb-promise'; |
| import { |
| replicate, |
| toPouch |
| } from './replicateWrapper'; |
| import inherits from 'inherits'; |
| import { EventEmitter as EE } from 'events'; |
| import { clone } from 'pouchdb-utils'; |
| |
| inherits(Sync, EE); |
| export default sync; |
| function sync(src, target, opts, callback) { |
| if (typeof opts === 'function') { |
| callback = opts; |
| opts = {}; |
| } |
| if (typeof opts === 'undefined') { |
| opts = {}; |
| } |
| opts = clone(opts); |
| /*jshint validthis:true */ |
| opts.PouchConstructor = opts.PouchConstructor || this; |
| src = toPouch(src, opts); |
| target = toPouch(target, opts); |
| return new Sync(src, target, opts, callback); |
| } |
| |
| function Sync(src, target, opts, callback) { |
| var self = this; |
| this.canceled = false; |
| |
| var optsPush = opts.push ? extend({}, opts, opts.push) : opts; |
| var optsPull = opts.pull ? extend({}, opts, opts.pull) : opts; |
| |
| this.push = replicate(src, target, optsPush); |
| this.pull = replicate(target, src, optsPull); |
| |
| this.pushPaused = true; |
| this.pullPaused = true; |
| |
| function pullChange(change) { |
| self.emit('change', { |
| direction: 'pull', |
| change: change |
| }); |
| } |
| function pushChange(change) { |
| self.emit('change', { |
| direction: 'push', |
| change: change |
| }); |
| } |
| function pushDenied(doc) { |
| self.emit('denied', { |
| direction: 'push', |
| doc: doc |
| }); |
| } |
| function pullDenied(doc) { |
| self.emit('denied', { |
| direction: 'pull', |
| doc: doc |
| }); |
| } |
| function pushPaused() { |
| self.pushPaused = true; |
| if (self.pullPaused) { |
| self.emit('paused'); |
| } |
| } |
| function pullPaused() { |
| self.pullPaused = true; |
| if (self.pushPaused) { |
| self.emit('paused'); |
| } |
| } |
| function pushActive() { |
| self.pushPaused = false; |
| if (self.pullPaused) { |
| self.emit('active', { |
| direction: 'push' |
| }); |
| } |
| } |
| function pullActive() { |
| self.pullPaused = false; |
| /* istanbul ignore if */ |
| if (self.pushPaused) { |
| self.emit('active', { |
| direction: 'pull' |
| }); |
| } |
| } |
| |
| var removed = {}; |
| |
| function removeAll(type) { // type is 'push' or 'pull' |
| return function (event, func) { |
| var isChange = event === 'change' && |
| (func === pullChange || func === pushChange); |
| var isDenied = event === 'denied' && |
| (func === pullDenied || func === pushDenied); |
| var isPaused = event === 'paused' && |
| (func === pullPaused || func === pushPaused); |
| var isActive = event === 'active' && |
| (func === pullActive || func === pushActive); |
| |
| if (isChange || isDenied || isPaused || isActive) { |
| if (!(event in removed)) { |
| removed[event] = {}; |
| } |
| removed[event][type] = true; |
| if (Object.keys(removed[event]).length === 2) { |
| // both push and pull have asked to be removed |
| self.removeAllListeners(event); |
| } |
| } |
| }; |
| } |
| |
| if (opts.live) { |
| this.push.on('complete', self.pull.cancel.bind(self.pull)); |
| this.pull.on('complete', self.push.cancel.bind(self.push)); |
| } |
| |
| this.on('newListener', function (event) { |
| if (event === 'change') { |
| self.pull.on('change', pullChange); |
| self.push.on('change', pushChange); |
| } else if (event === 'denied') { |
| self.pull.on('denied', pullDenied); |
| self.push.on('denied', pushDenied); |
| } else if (event === 'active') { |
| self.pull.on('active', pullActive); |
| self.push.on('active', pushActive); |
| } else if (event === 'paused') { |
| self.pull.on('paused', pullPaused); |
| self.push.on('paused', pushPaused); |
| } |
| }); |
| |
| this.on('removeListener', function (event) { |
| if (event === 'change') { |
| self.pull.removeListener('change', pullChange); |
| self.push.removeListener('change', pushChange); |
| } else if (event === 'denied') { |
| self.pull.removeListener('denied', pullDenied); |
| self.push.removeListener('denied', pushDenied); |
| } else if (event === 'active') { |
| self.pull.removeListener('active', pullActive); |
| self.push.removeListener('active', pushActive); |
| } else if (event === 'paused') { |
| self.pull.removeListener('paused', pullPaused); |
| self.push.removeListener('paused', pushPaused); |
| } |
| }); |
| |
| this.pull.on('removeListener', removeAll('pull')); |
| this.push.on('removeListener', removeAll('push')); |
| |
| var promise = Promise.all([ |
| this.push, |
| this.pull |
| ]).then(function (resp) { |
| var out = { |
| push: resp[0], |
| pull: resp[1] |
| }; |
| self.emit('complete', out); |
| if (callback) { |
| callback(null, out); |
| } |
| self.removeAllListeners(); |
| return out; |
| }, function (err) { |
| self.cancel(); |
| if (callback) { |
| // if there's a callback, then the callback can receive |
| // the error event |
| callback(err); |
| } else { |
| // if there's no callback, then we're safe to emit an error |
| // event, which would otherwise throw an unhandled error |
| // due to 'error' being a special event in EventEmitters |
| self.emit('error', err); |
| } |
| self.removeAllListeners(); |
| if (callback) { |
| // no sense throwing if we're already emitting an 'error' event |
| throw err; |
| } |
| }); |
| |
| this.then = function (success, err) { |
| return promise.then(success, err); |
| }; |
| |
| this.catch = function (err) { |
| return promise.catch(err); |
| }; |
| } |
| |
| Sync.prototype.cancel = function () { |
| if (!this.canceled) { |
| this.canceled = true; |
| this.push.cancel(); |
| this.pull.cancel(); |
| } |
| }; |