| #!/usr/bin/env python3 |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| |
| import argparse |
| import base64 |
| import json |
| import textwrap |
| import threading |
| import time |
| import sys |
| |
| try: |
| from urllib.parse import quote |
| except ImportError: |
| from urllib import quote |
| import requests |
| |
| try: |
| import progressbar |
| |
| HAVE_BAR = True |
| except ImportError: |
| HAVE_BAR = False |
| |
| |
| def _tojson(req): |
| """Support requests v0.x as well as 1.x+""" |
| if requests.__version__[0] == "0": |
| return json.loads(req.content) |
| return req.json() |
| |
| |
| def _args(args): |
| args = vars(args) |
| if args["password"]: |
| args["creds"] = (args["login"], args["password"]) |
| else: |
| args["creds"] = None |
| return args |
| |
| |
| def _do_list(args): |
| port = str(args["local_port"]) |
| req = requests.get("http://127.0.0.1:" + port + "/_all_dbs", auth=args["creds"]) |
| req.raise_for_status() |
| dbs = _tojson(req) |
| local_dbs = [x for x in dbs if "shards" not in x and x not in ["_dbs", "_nodes"]] |
| clustered_dbs = list( |
| set([x.split("/")[2].split(".")[0] for x in dbs if "shards" in x]) |
| ) |
| if not args["include_system_dbs"]: |
| # list comprehension to eliminate dbs starting with underscore |
| local_dbs = [x for x in local_dbs if x[0] != "_"] |
| clustered_dbs = [x for x in clustered_dbs if x[0] != "_"] |
| local_dbs.sort() |
| clustered_dbs.sort() |
| if args.get("clustered"): |
| return clustered_dbs |
| return local_dbs |
| |
| |
| def _list(args): |
| args = _args(args) |
| ret = _do_list(args) |
| print(", ".join(ret)) |
| |
| |
| def _watch_replication( |
| db, |
| local_port=5986, |
| clustered_port=5984, |
| creds=None, |
| hide_progress_bar=False, |
| quiet=False, |
| timeout=30, |
| ): |
| """Watches replication, optionally with a progressbar.""" |
| time.sleep(1) |
| if not quiet: |
| print("Replication started.") |
| url = "http://127.0.0.1:{}/{}".format(local_port, db) |
| try: |
| req = requests.get(url, auth=creds) |
| req.raise_for_status() |
| req = _tojson(req) |
| # here, local means node-local, i.e. source (1.x) database |
| local_docs = req["doc_count"] |
| local_size = req["data_size"] |
| except requests.exceptions.HTTPError: |
| raise Exception("Cannot retrieve {} doc_count!".format(db)) |
| if local_size == 0: |
| return |
| if HAVE_BAR and not hide_progress_bar and not quiet: |
| widgets = [ |
| db, |
| " ", |
| progressbar.Percentage(), |
| " ", |
| progressbar.Bar(marker=progressbar.RotatingMarker()), |
| " ", |
| progressbar.ETA(), |
| " ", |
| progressbar.FileTransferSpeed(), |
| ] |
| progbar = progressbar.ProgressBar(widgets=widgets, maxval=local_size).start() |
| count = 0 |
| stall_count = 0 |
| url = "http://127.0.0.1:{}/{}".format(clustered_port, db) |
| while count < local_docs: |
| try: |
| req = requests.get(url, auth=creds) |
| req.raise_for_status() |
| req = _tojson(req) |
| # here, cluster means clustered port, i.e. port 5984 |
| clus_count = req["doc_count"] |
| clus_size = req["data_size"] |
| except requests.exceptions.HTTPError as exc: |
| if exc.response.status_code == 404: |
| clus_count = 0 |
| clus_size = 0 |
| else: |
| raise Exception("Cannot retrieve {} doc_count!".format(db)) |
| if count == clus_count: |
| stall_count += 1 |
| else: |
| stall_count = 0 |
| if stall_count == timeout: |
| if not quiet: |
| print("Replication is stalled. Increase timeout or reduce load.") |
| exit(1) |
| if HAVE_BAR and not hide_progress_bar and not quiet: |
| if clus_size > local_size: |
| clus_size = local_size |
| progbar.update(clus_size) |
| count = clus_count |
| time.sleep(1) |
| if HAVE_BAR and not hide_progress_bar and not quiet: |
| progbar.finish() |
| return 0 |
| |
| |
| def _put_filter(args, db=None): |
| """Adds _design/repl_filters tombstone replication filter to DB.""" |
| ddoc = { |
| "_id": "_design/repl_filters", |
| "filters": {"no_deleted": "function(doc,req){return !doc._deleted;};"}, |
| } |
| try: |
| req = requests.get( |
| "http://127.0.0.1:{}/{}/_design/repl_filters".format( |
| args["local_port"], db |
| ), |
| auth=args["creds"], |
| ) |
| req.raise_for_status() |
| doc = _tojson(req) |
| del doc["_rev"] |
| if doc != ddoc: |
| if not args["quiet"]: |
| print("Source replication filter does not match! Aborting.") |
| exit(1) |
| except requests.exceptions.HTTPError as exc: |
| if exc.response.status_code == 404: |
| if not args["quiet"]: |
| print("Adding replication filter to source database...") |
| req = requests.put( |
| "http://127.0.0.1:{}/{}/_design/repl_filters".format( |
| args["local_port"], db |
| ), |
| data=json.dumps(ddoc), |
| auth=args["creds"], |
| ) |
| req.raise_for_status() |
| elif not args["quiet"]: |
| print(exc.response.text) |
| exit(1) |
| |
| |
| def _do_security(args, db=None): |
| """Copies the _security object from source to target DB.""" |
| try: |
| req = requests.get( |
| "http://127.0.0.1:{}/{}/_security".format(args["local_port"], db), |
| auth=args["creds"], |
| ) |
| req.raise_for_status() |
| security_doc = _tojson(req) |
| req = requests.put( |
| "http://127.0.0.1:{}/{}/_security".format(args["clustered_port"], db), |
| data=json.dumps(security_doc), |
| auth=args["creds"], |
| ) |
| req.raise_for_status() |
| except requests.exceptions.HTTPError as exc: |
| print(exc.response.text) |
| exit(1) |
| |
| |
| def _replicate(args): |
| args = _args(args) |
| if args["all_dbs"]: |
| dbs = _do_list(args) |
| else: |
| dbs = args["dbs"] |
| |
| for db in dbs: |
| if args["filter_deleted"]: |
| _put_filter(args, db) |
| |
| if not args["quiet"]: |
| print("Starting replication for " + db + "...") |
| db = quote(db, safe="") |
| doc = { |
| "continuous": False, |
| "create_target": True, |
| "source": {"url": "http://127.0.0.1:{}/{}".format(args["local_port"], db)}, |
| "target": { |
| "url": "http://127.0.0.1:{}/{}".format(args["clustered_port"], db) |
| }, |
| } |
| if args["filter_deleted"]: |
| doc["filter"] = "repl_filters/no_deleted" |
| if args["creds"]: |
| auth = ( |
| "Basic " + base64.b64encode(":".join(args["creds"]).encode()).decode() |
| ) |
| headers = {"authorization": auth} |
| doc["source"]["headers"] = headers |
| doc["target"]["headers"] = headers |
| watch_args = { |
| y: args[y] |
| for y in [ |
| "local_port", |
| "clustered_port", |
| "creds", |
| "hide_progress_bar", |
| "timeout", |
| "quiet", |
| ] |
| } |
| watch_args["db"] = db |
| watch = threading.Thread(target=_watch_replication, kwargs=watch_args) |
| watch.start() |
| try: |
| req = requests.post( |
| "http://127.0.0.1:{}/_replicate".format(args["clustered_port"]), |
| auth=args["creds"], |
| data=json.dumps(doc), |
| headers={"Content-type": "application/json"}, |
| ) |
| req.raise_for_status() |
| req = _tojson(req) |
| except requests.exceptions.HTTPError as exc: |
| if not args["quiet"]: |
| print(exc.response.text) |
| exit(1) |
| watch.join() |
| if req.get("no_changes"): |
| if not args["quiet"]: |
| print("No changes, replication is caught up.") |
| |
| if not args["quiet"]: |
| print("Copying _security object for " + db + "...") |
| _do_security(args, db) |
| |
| if not args["quiet"]: |
| print("Replication complete.") |
| |
| |
| def _rebuild(args): |
| args = _args(args) |
| if args["all_dbs"]: |
| if args["views"]: |
| if not args["quiet"]: |
| print("Cannot take list of views for more than 1 database.") |
| exit(1) |
| args["clustered"] = True |
| dbs = _do_list(args) |
| else: |
| dbs = [args["db"]] |
| for db in dbs: |
| if args["views"]: |
| views = args["views"] |
| else: |
| try: |
| req = requests.get( |
| "http://127.0.0.1:{}/{}/_all_docs".format( |
| args["clustered_port"], db |
| ), |
| params={"start_key": '"_design/"', "end_key": '"_design0"'}, |
| auth=args["creds"], |
| ) |
| req.raise_for_status() |
| req = _tojson(req) |
| except requests.exceptions.HTTPError as exc: |
| if not args["quiet"]: |
| print(exc.response.text) |
| exit(1) |
| req = req["rows"] |
| ddocs = [x["id"].split("/")[1] for x in req] |
| for ddoc in ddocs: |
| try: |
| req = requests.get( |
| "http://127.0.0.1:{}/{}/_design/{}".format( |
| args["clustered_port"], db, ddoc |
| ), |
| auth=args["creds"], |
| ) |
| req.raise_for_status() |
| doc = _tojson(req) |
| except requests.exceptions.HTTPError as exc: |
| if not args["quiet"]: |
| print(exc.response.text) |
| exit(1) |
| if "views" not in doc: |
| if not args["quiet"]: |
| print("Skipping {}/{}, no views found".format(db, ddoc)) |
| continue |
| # only need to refresh a single view per ddoc |
| if not args["quiet"]: |
| print("Refreshing views in {}/{}...".format(db, ddoc)) |
| view = list(doc["views"].keys())[0] |
| try: |
| req = requests.get( |
| "http://127.0.0.1:{}/{}/_design/{}/_view/{}".format( |
| args["clustered_port"], db, ddoc, view |
| ), |
| params={"limit": 1}, |
| auth=args["creds"], |
| timeout=float(args["timeout"]), |
| ) |
| except requests.exceptions.Timeout: |
| if not args["quiet"]: |
| print("Timeout, view is processing. Moving on.") |
| except requests.exceptions.HTTPError as exc: |
| if not args["quiet"]: |
| print(exc.response.text) |
| exit(1) |
| |
| |
| def _delete(args): |
| args = _args(args) |
| if args["all_dbs"]: |
| args["include_system_dbs"] = False |
| dbs = _do_list(args) |
| else: |
| dbs = args["dbs"] |
| for db in dbs: |
| db = quote(db, safe="") |
| local_url = "http://127.0.0.1:{}/{}".format(args["local_port"], db) |
| clus_url = "http://127.0.0.1:{}/{}".format(args["clustered_port"], db) |
| try: |
| req = requests.get(local_url, auth=args["creds"]) |
| req.raise_for_status() |
| req = _tojson(req) |
| local_docs = req["doc_count"] |
| req = requests.get(clus_url, auth=args["creds"]) |
| req.raise_for_status() |
| req = _tojson(req) |
| clus_docs = req["doc_count"] |
| if clus_docs < local_docs and not args["force"]: |
| if not args["quiet"]: |
| print( |
| "Clustered DB has less docs than local version!" |
| + " Skipping..." |
| ) |
| continue |
| if not args["quiet"]: |
| print("Deleting " + db + "...") |
| req = requests.delete( |
| "http://127.0.0.1:{}/{}".format(args["local_port"], db), |
| auth=args["creds"], |
| ) |
| req.raise_for_status() |
| except requests.exceptions.HTTPError as exc: |
| if not args["quiet"]: |
| print(exc.response.text) |
| exit(1) |
| |
| |
| def main(argv): |
| """Kindly do the needful.""" |
| parser = argparse.ArgumentParser( |
| prog="couchup", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| description=textwrap.dedent( |
| """\ |
| Migrate CouchDB 1.x databases to CouchDB 2.x. |
| |
| Specify a subcommand and -h or --help for more help. |
| """ |
| ), |
| ) |
| |
| subparsers = parser.add_subparsers() |
| |
| parser_list = subparsers.add_parser( |
| "list", |
| help="lists all CouchDB 1.x databases", |
| formatter_class=argparse.RawTextHelpFormatter, |
| description=textwrap.dedent( |
| """\ |
| Examples: |
| couchup list |
| couchup list -c -i -p mysecretpassword |
| """ |
| ), |
| ) |
| parser_list.add_argument( |
| "-c", |
| "--clustered", |
| action="store_true", |
| help="show clustered (2.x) databases instead", |
| ) |
| parser_list.add_argument( |
| "-i", |
| "--include-system-dbs", |
| action="store_true", |
| help="include system databases (_users, _replicator, etc.)", |
| ) |
| parser_list.add_argument( |
| "-l", "--login", default="admin", help="specify login (default admin)" |
| ) |
| parser_list.add_argument("-p", "--password", help="specify password") |
| parser_list.add_argument( |
| "--local-port", default=5986, help="override local port (default 5986)" |
| ) |
| parser_list.add_argument( |
| "--clustered-port", default=5984, help="override clustered port (default 5984)" |
| ) |
| parser_list.set_defaults(func=_list) |
| |
| parser_replicate = subparsers.add_parser( |
| "replicate", |
| help="replicates one or more 1.x databases to CouchDB 2.x", |
| formatter_class=argparse.RawTextHelpFormatter, |
| description=textwrap.dedent( |
| """\ |
| Examples: |
| couchup replicate movies |
| couchup replicate -f lots_of_deleted_docs_db |
| couchup replicate -i -q -n _users |
| |
| Note: |
| The -f/--filter-deleted option adds a replication filter |
| to the source database, _design/repl_filters, that |
| is used during replication to filter out deleted |
| documents. This can greatly reduce the size of your |
| 2.x database if there are many deleted documents. |
| |
| It is IMPORTANT that no documents be deleted from the 1.x |
| database during this process, or those deletions may not |
| successfully replicate to the 2.x database. |
| """ |
| ), |
| ) |
| parser_replicate.add_argument( |
| "-a", "--all-dbs", action="store_true", help="act on all databases available" |
| ) |
| parser_replicate.add_argument( |
| "-i", |
| "--include-system-dbs", |
| action="store_true", |
| help="include system databases (_users, _replicator, etc.)", |
| ) |
| parser_replicate.add_argument( |
| "-q", "--quiet", action="store_true", help="suppress all output" |
| ) |
| parser_replicate.add_argument( |
| "-n", |
| "--hide-progress-bar", |
| action="store_true", |
| help="suppress progress bar display", |
| ) |
| parser_replicate.add_argument( |
| "-f", |
| "--filter-deleted", |
| action="store_true", |
| help="filter deleted document tombstones during replication", |
| ) |
| parser_replicate.add_argument( |
| "-t", |
| "--timeout", |
| default=30, |
| help="stalled replication timeout threshhold in s (def: 30)", |
| ) |
| parser_replicate.add_argument( |
| "-l", "--login", default="admin", help="specify login (default admin)" |
| ) |
| parser_replicate.add_argument("-p", "--password", help="specify password") |
| parser_replicate.add_argument( |
| "--local-port", default=5986, help="override local port (default 5986)" |
| ) |
| parser_replicate.add_argument( |
| "--clustered-port", default=5984, help="override clustered port (default 5984)" |
| ) |
| parser_replicate.add_argument( |
| "dbs", metavar="db", type=str, nargs="*", help="database(s) to be processed" |
| ) |
| parser_replicate.set_defaults(func=_replicate) |
| |
| parser_rebuild = subparsers.add_parser( |
| "rebuild", |
| help="rebuilds one or more CouchDB 2.x views", |
| formatter_class=argparse.RawTextHelpFormatter, |
| description=textwrap.dedent( |
| """\ |
| Examples: |
| couchup rebuild movies |
| couchup rebuild movies by_name |
| couchup rebuild -a -q -p mysecretpassword |
| """ |
| ), |
| ) |
| parser_rebuild.add_argument( |
| "-a", "--all-dbs", action="store_true", help="act on all databases available" |
| ) |
| parser_rebuild.add_argument( |
| "-q", "--quiet", action="store_true", help="suppress all output" |
| ) |
| parser_rebuild.add_argument( |
| "-t", |
| "--timeout", |
| default=5, |
| help="timeout for waiting for view rebuild in s (default: 5)", |
| ) |
| parser_rebuild.add_argument( |
| "-i", |
| "--include-system-dbs", |
| action="store_true", |
| help="include system databases (_users, _replicator, etc.)", |
| ) |
| parser_rebuild.add_argument( |
| "-l", "--login", default="admin", help="specify login (default admin)" |
| ) |
| parser_rebuild.add_argument("-p", "--password", help="specify password") |
| parser_rebuild.add_argument( |
| "--local-port", default=5986, help="override local port (default 5986)" |
| ) |
| parser_rebuild.add_argument( |
| "--clustered-port", default=5984, help="override clustered port (default 5984)" |
| ) |
| parser_rebuild.add_argument( |
| "db", metavar="db", type=str, nargs="?", help="database to be processed" |
| ) |
| parser_rebuild.add_argument( |
| "views", |
| metavar="view", |
| type=str, |
| nargs="*", |
| help="view(s) to be processed (all by default)", |
| ) |
| parser_rebuild.set_defaults(func=_rebuild) |
| |
| parser_delete = subparsers.add_parser( |
| "delete", |
| help="deletes one or more CouchDB 1.x databases", |
| formatter_class=argparse.RawTextHelpFormatter, |
| description=textwrap.dedent( |
| """\ |
| Examples: |
| couchup delete movies |
| couchup delete -q -p mysecretpassword movies |
| """ |
| ), |
| ) |
| parser_delete.add_argument( |
| "-a", "--all-dbs", action="store_true", help="act on all databases available" |
| ) |
| parser_delete.add_argument( |
| "-f", |
| "--force", |
| action="store_true", |
| help="force deletion even if 1.x and 2.x databases are not identical", |
| ) |
| parser_delete.add_argument( |
| "-q", "--quiet", action="store_true", help="suppress all output" |
| ) |
| parser_delete.add_argument( |
| "-l", "--login", default="admin", help="specify login (default admin)" |
| ) |
| parser_delete.add_argument("-p", "--password", help="specify password") |
| parser_delete.add_argument( |
| "--local-port", default=5986, help="override local port (default 5986)" |
| ) |
| parser_delete.add_argument( |
| "--clustered-port", default=5984, help="override clustered port (default 5984)" |
| ) |
| parser_delete.add_argument( |
| "dbs", metavar="db", type=str, nargs="*", help="database(s) to be processed" |
| ) |
| parser_delete.set_defaults(func=_delete) |
| |
| args = parser.parse_args(argv[1:]) |
| try: |
| args.func(args) |
| except AttributeError: |
| parser.print_help() |
| sys.exit(0) |
| |
| |
| if __name__ == "__main__": |
| main(sys.argv) |