| #!/usr/bin/env python |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| |
| import argparse |
| import base64 |
| import json |
| import textwrap |
| import threading |
| import time |
| import sys |
| try: |
| from urllib import quote |
| except ImportError: |
| from urllib.parse import quote |
| import requests |
| try: |
| import progressbar |
| HAVE_BAR = True |
| except ImportError: |
| HAVE_BAR = False |
| |
| def _tojson(req): |
| """Support requests v0.x as well as 1.x+""" |
| if requests.__version__[0] == '0': |
| return json.loads(req.content) |
| return req.json() |
| |
| def _args(args): |
| args = vars(args) |
| if args['password']: |
| args['creds'] = (args['login'], args['password']) |
| else: |
| args['creds'] = None |
| return args |
| |
| def _do_list(args): |
| port = str(args['local_port']) |
| req = requests.get('http://127.0.0.1:' + port + '/_all_dbs', |
| auth=args['creds']) |
| req.raise_for_status() |
| dbs = _tojson(req) |
| local_dbs = [x for x in dbs if "shards" not in x |
| and x not in ['_dbs', '_nodes']] |
| clustered_dbs = list(set( |
| [x.split('/')[2].split('.')[0] for x in dbs if "shards" in x] |
| )) |
| if not args['include_system_dbs']: |
| # list comprehension to eliminate dbs starting with underscore |
| local_dbs = [x for x in local_dbs if x[0] != '_'] |
| clustered_dbs = [x for x in clustered_dbs if x[0] != '_'] |
| local_dbs.sort() |
| clustered_dbs.sort() |
| if args.get('clustered'): |
| return clustered_dbs |
| return local_dbs |
| |
| def _list(args): |
| args = _args(args) |
| ret = _do_list(args) |
| print(", ".join(ret)) |
| |
| def _watch_replication(db, |
| local_port=5986, |
| clustered_port=5984, |
| creds=None, |
| hide_progress_bar=False, |
| quiet=False, |
| timeout=30): |
| """Watches replication, optionally with a progressbar.""" |
| time.sleep(1) |
| if not quiet: |
| print("Replication started.") |
| url = "http://127.0.0.1:{}/{}".format(local_port, db) |
| try: |
| req = requests.get(url, auth=creds) |
| req.raise_for_status() |
| req = _tojson(req) |
| # here, local means node-local, i.e. source (1.x) database |
| local_docs = req['doc_count'] |
| local_size = req['data_size'] |
| except requests.exceptions.HTTPError: |
| raise Exception('Cannot retrieve {} doc_count!'.format(db)) |
| if local_size == 0: |
| return |
| if HAVE_BAR and not hide_progress_bar and not quiet: |
| widgets = [ |
| db, |
| ' ', progressbar.Percentage(), |
| ' ', progressbar.Bar(marker=progressbar.RotatingMarker()), |
| ' ', progressbar.ETA(), |
| ' ', progressbar.FileTransferSpeed(), |
| ] |
| progbar = progressbar.ProgressBar(widgets=widgets, |
| maxval=local_size).start() |
| count = 0 |
| stall_count = 0 |
| url = "http://127.0.0.1:{}/{}".format(clustered_port, db) |
| while count < local_docs: |
| try: |
| req = requests.get(url, auth=creds) |
| req.raise_for_status() |
| req = _tojson(req) |
| # here, cluster means clustered port, i.e. port 5984 |
| clus_count = req['doc_count'] |
| clus_size = req['data_size'] |
| except requests.exceptions.HTTPError as exc: |
| if exc.response.status_code == 404: |
| clus_count = 0 |
| clus_size = 0 |
| else: |
| raise Exception('Cannot retrieve {} doc_count!'.format(db)) |
| if count == clus_count: |
| stall_count += 1 |
| else: |
| stall_count = 0 |
| if stall_count == timeout: |
| if not quiet: |
| print( |
| "Replication is stalled. Increase timeout or reduce load.") |
| exit(1) |
| if HAVE_BAR and not hide_progress_bar and not quiet: |
| if clus_size > local_size: |
| clus_size = local_size |
| progbar.update(clus_size) |
| count = clus_count |
| time.sleep(1) |
| if HAVE_BAR and not hide_progress_bar and not quiet: |
| progbar.finish() |
| return 0 |
| |
| def _put_filter(args, db=None): |
| """Adds _design/repl_filters tombstone replication filter to DB.""" |
| ddoc = { |
| '_id': '_design/repl_filters', |
| 'filters': { |
| 'no_deleted': 'function(doc,req){return !doc._deleted;};' |
| } |
| } |
| try: |
| req = requests.get( |
| 'http://127.0.0.1:{}/{}/_design/repl_filters'.format( |
| args['local_port'], db), |
| auth=args['creds']) |
| req.raise_for_status() |
| doc = _tojson(req) |
| del doc['_rev'] |
| if doc != ddoc: |
| if not args['quiet']: |
| print('Source replication filter does not match! Aborting.') |
| exit(1) |
| except requests.exceptions.HTTPError as exc: |
| if exc.response.status_code == 404: |
| if not args['quiet']: |
| print('Adding replication filter to source database...') |
| req = requests.put( |
| 'http://127.0.0.1:{}/{}/_design/repl_filters'.format( |
| args['local_port'], db), |
| data=json.dumps(ddoc), |
| auth=args['creds']) |
| req.raise_for_status() |
| elif not args['quiet']: |
| print(exc.response.text) |
| exit(1) |
| |
| def _replicate(args): |
| args = _args(args) |
| if args['all_dbs']: |
| dbs = _do_list(args) |
| else: |
| dbs = args['dbs'] |
| |
| for db in dbs: |
| if args['filter_deleted']: |
| _put_filter(args, db) |
| |
| if not args['quiet']: |
| print('Starting replication for ' + db + '...') |
| db = quote(db, safe='') |
| doc = { |
| 'continuous': False, |
| 'create_target': True, |
| 'source': { |
| 'url': 'http://127.0.0.1:{}/{}'.format( |
| args['local_port'], db) |
| }, |
| 'target': { |
| 'url': 'http://127.0.0.1:{}/{}'.format( |
| args['clustered_port'], db) |
| } |
| } |
| if args['filter_deleted']: |
| doc['filter'] = 'repl_filters/no_deleted' |
| if args['creds']: |
| auth = 'Basic ' + base64.b64encode(':'.join(args['creds'])) |
| headers = { |
| 'authorization': auth |
| } |
| doc['source']['headers'] = headers |
| doc['target']['headers'] = headers |
| watch_args = {y: args[y] for y in [ |
| 'local_port', 'clustered_port', 'creds', 'hide_progress_bar', |
| 'timeout', 'quiet']} |
| watch_args['db'] = db |
| watch = threading.Thread(target=_watch_replication, kwargs=watch_args) |
| watch.start() |
| try: |
| req = requests.post('http://127.0.0.1:{}/_replicate'.format( |
| args['clustered_port']), |
| auth=args['creds'], |
| data=json.dumps(doc), |
| headers={'Content-type': 'application/json'}) |
| req.raise_for_status() |
| req = _tojson(req) |
| except requests.exceptions.HTTPError as exc: |
| if not args['quiet']: |
| print(exc.response.text) |
| exit(1) |
| watch.join() |
| if req.get('no_changes'): |
| if not args['quiet']: |
| print("No changes, replication is caught up.") |
| if not args['quiet']: |
| print("Replication complete.") |
| |
| def _rebuild(args): |
| args = _args(args) |
| if args['all_dbs']: |
| if args['views']: |
| if not args['quiet']: |
| print("Cannot take list of views for more than 1 database.") |
| exit(1) |
| args['clustered'] = True |
| dbs = _do_list(args) |
| else: |
| dbs = [args['db']] |
| for db in dbs: |
| if args['views']: |
| views = args['views'] |
| else: |
| try: |
| req = requests.get('http://127.0.0.1:{}/{}/_all_docs'.format( |
| args['clustered_port'], db), |
| params={ |
| 'start_key': '"_design/"', |
| 'end_key': '"_design0"' |
| }, |
| auth=args['creds']) |
| req.raise_for_status() |
| req = _tojson(req) |
| except requests.exceptions.HTTPError as exc: |
| if not args['quiet']: |
| print(exc.response.text) |
| exit(1) |
| req = req['rows'] |
| ddocs = [x['id'].split('/')[1] for x in req] |
| for ddoc in ddocs: |
| try: |
| req = requests.get('http://127.0.0.1:{}/{}/_design/{}'.format( |
| args['clustered_port'], db, ddoc), |
| auth=args['creds']) |
| req.raise_for_status() |
| doc = _tojson(req) |
| except requests.exceptions.HTTPError as exc: |
| if not args['quiet']: |
| print(exc.response.text) |
| exit(1) |
| if 'views' not in doc: |
| if not args['quiet']: |
| print("Skipping {}/{}, no views found".format(db, ddoc)) |
| continue |
| # only need to refresh a single view per ddoc |
| if not args['quiet']: |
| print("Refreshing views in {}/{}...".format(db, ddoc)) |
| view = list(doc['views'].keys())[0] |
| try: |
| req = requests.get( |
| 'http://127.0.0.1:{}/{}/_design/{}/_view/{}'.format( |
| args['clustered_port'], db, ddoc, view), |
| params={'limit': 1}, |
| auth=args['creds'], |
| timeout=float(args['timeout'])) |
| except requests.exceptions.Timeout: |
| if not args['quiet']: |
| print("Timeout, view is processing. Moving on.") |
| except requests.exceptions.HTTPError as exc: |
| if not args['quiet']: |
| print(exc.response.text) |
| exit(1) |
| |
| def _delete(args): |
| args = _args(args) |
| if args['all_dbs']: |
| args['include_system_dbs'] = False |
| dbs = _do_list(args) |
| else: |
| dbs = args['dbs'] |
| for db in dbs: |
| db = quote(db, safe='') |
| local_url = 'http://127.0.0.1:{}/{}'.format(args['local_port'], db) |
| clus_url = 'http://127.0.0.1:{}/{}'.format(args['clustered_port'], db) |
| try: |
| req = requests.get(local_url, auth=args['creds']) |
| req.raise_for_status() |
| req = _tojson(req) |
| local_docs = req['doc_count'] |
| req = requests.get(clus_url, auth=args['creds']) |
| req.raise_for_status() |
| req = _tojson(req) |
| clus_docs = req['doc_count'] |
| if clus_docs < local_docs and not args['force']: |
| if not args['quiet']: |
| print('Clustered DB has less docs than local version!' + |
| ' Skipping...') |
| continue |
| if not args['quiet']: |
| print('Deleting ' + db + '...') |
| req = requests.delete('http://127.0.0.1:{}/{}'.format( |
| args['local_port'], db), |
| auth=args['creds']) |
| req.raise_for_status() |
| except requests.exceptions.HTTPError as exc: |
| if not args['quiet']: |
| print(exc.response.text) |
| exit(1) |
| |
| def main(argv): |
| """Kindly do the needful.""" |
| parser = argparse.ArgumentParser(prog='couchup', |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| description=textwrap.dedent('''\ |
| Migrate CouchDB 1.x databases to CouchDB 2.x. |
| |
| Specify a subcommand and -h or --help for more help. |
| ''')) |
| |
| subparsers = parser.add_subparsers() |
| |
| parser_list = subparsers.add_parser('list', |
| help='lists all CouchDB 1.x databases', |
| formatter_class=argparse.RawTextHelpFormatter, |
| description=textwrap.dedent('''\ |
| Examples: |
| couchup list |
| couchup list -c -i -p mysecretpassword |
| ''')) |
| parser_list.add_argument('-c', '--clustered', action='store_true', |
| help='show clustered (2.x) databases instead') |
| parser_list.add_argument('-i', '--include-system-dbs', |
| action='store_true', |
| help='include system databases (_users, _replicator, etc.)') |
| parser_list.add_argument('-l', '--login', default='admin', |
| help='specify login (default admin)') |
| parser_list.add_argument('-p', '--password', |
| help='specify password') |
| parser_list.add_argument('--local-port', default=5986, |
| help='override local port (default 5986)') |
| parser_list.add_argument('--clustered-port', default=5984, |
| help='override clustered port (default 5984)') |
| parser_list.set_defaults(func=_list) |
| |
| parser_replicate = subparsers.add_parser('replicate', |
| help='replicates one or more 1.x databases to CouchDB 2.x', |
| formatter_class=argparse.RawTextHelpFormatter, |
| description=textwrap.dedent('''\ |
| Examples: |
| couchup replicate movies |
| couchup replicate -f lots_of_deleted_docs_db |
| couchup replicate -i -q -n _users |
| |
| Note: |
| The -f/--filter-deleted option adds a replication filter |
| to the source database, _design/repl_filters, that |
| is used during replication to filter out deleted |
| documents. This can greatly reduce the size of your |
| 2.x database if there are many deleted documents. |
| |
| It is IMPORTANT that no documents be deleted from the 1.x |
| database during this process, or those deletions may not |
| successfully replicate to the 2.x database. |
| ''')) |
| parser_replicate.add_argument('-a', '--all-dbs', action='store_true', |
| help='act on all databases available') |
| parser_replicate.add_argument('-i', '--include-system-dbs', |
| action='store_true', |
| help='include system databases (_users, _replicator, etc.)') |
| parser_replicate.add_argument('-q', '--quiet', action='store_true', |
| help='suppress all output') |
| parser_replicate.add_argument('-n', '--hide-progress-bar', |
| action='store_true', |
| help='suppress progress bar display') |
| parser_replicate.add_argument('-f', '--filter-deleted', |
| action='store_true', |
| help='filter deleted document tombstones during replication') |
| parser_replicate.add_argument('-t', '--timeout', default=30, |
| help='stalled replication timeout threshhold in s (def: 30)') |
| parser_replicate.add_argument('-l', '--login', default='admin', |
| help='specify login (default admin)') |
| parser_replicate.add_argument('-p', '--password', |
| help='specify password') |
| parser_replicate.add_argument('--local-port', default=5986, |
| help='override local port (default 5986)') |
| parser_replicate.add_argument('--clustered-port', default=5984, |
| help='override clustered port (default 5984)') |
| parser_replicate.add_argument('dbs', metavar='db', type=str, nargs="*", |
| help="database(s) to be processed") |
| parser_replicate.set_defaults(func=_replicate) |
| |
| parser_rebuild = subparsers.add_parser('rebuild', |
| help='rebuilds one or more CouchDB 2.x views', |
| formatter_class=argparse.RawTextHelpFormatter, |
| description=textwrap.dedent('''\ |
| Examples: |
| couchup rebuild movies |
| couchup rebuild movies by_name |
| couchup rebuild -a -q -p mysecretpassword |
| ''')) |
| parser_rebuild.add_argument('-a', '--all-dbs', action='store_true', |
| help='act on all databases available') |
| parser_rebuild.add_argument('-q', '--quiet', action='store_true', |
| help='suppress all output') |
| parser_rebuild.add_argument('-t', '--timeout', default=5, |
| help='timeout for waiting for view rebuild in s (default: 5)') |
| parser_rebuild.add_argument('-i', '--include-system-dbs', |
| action='store_true', |
| help='include system databases (_users, _replicator, etc.)') |
| parser_rebuild.add_argument('-l', '--login', default='admin', |
| help='specify login (default admin)') |
| parser_rebuild.add_argument('-p', '--password', |
| help='specify password') |
| parser_rebuild.add_argument('--local-port', default=5986, |
| help='override local port (default 5986)') |
| parser_rebuild.add_argument('--clustered-port', default=5984, |
| help='override clustered port (default 5984)') |
| parser_rebuild.add_argument('db', metavar='db', type=str, nargs="?", |
| help="database to be processed") |
| parser_rebuild.add_argument('views', metavar='view', type=str, nargs="*", |
| help="view(s) to be processed (all by default)") |
| parser_rebuild.set_defaults(func=_rebuild) |
| |
| parser_delete = subparsers.add_parser('delete', |
| help='deletes one or more CouchDB 1.x databases', |
| formatter_class=argparse.RawTextHelpFormatter, |
| description=textwrap.dedent('''\ |
| Examples: |
| couchup delete movies |
| couchup delete -q -p mysecretpassword movies |
| ''')) |
| parser_delete.add_argument('-a', '--all-dbs', action='store_true', |
| help='act on all databases available') |
| parser_delete.add_argument('-f', '--force', action='store_true', |
| help='force deletion even if 1.x and 2.x databases are not identical') |
| parser_delete.add_argument('-q', '--quiet', action='store_true', |
| help='suppress all output') |
| parser_delete.add_argument('-l', '--login', default='admin', |
| help='specify login (default admin)') |
| parser_delete.add_argument('-p', '--password', |
| help='specify password') |
| parser_delete.add_argument('--local-port', default=5986, |
| help='override local port (default 5986)') |
| parser_delete.add_argument('--clustered-port', default=5984, |
| help='override clustered port (default 5984)') |
| parser_delete.add_argument('dbs', metavar='db', type=str, nargs="*", |
| help="database(s) to be processed") |
| parser_delete.set_defaults(func=_delete) |
| |
| args = parser.parse_args(argv[1:]) |
| args.func(args) |
| |
| if __name__ == '__main__': |
| main(sys.argv) |