# */
# * Licensed to the Apache Software Foundation (ASF) under one
# * or more contributor license agreements.  See the NOTICE file
# * distributed with this work for additional information
# * regarding copyright ownership.  The ASF licenses this file
# * to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# * with the License.  You may obtain a copy of the License at
# *
# *   http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing,
# * software distributed under the License is distributed on an
# * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# * KIND, either express or implied.  See the License for the
#    * specific language governing permissions and limitations
# * under the License.
# */

from Queue import Empty
import json
import logging
import sys
from multiprocessing import Queue, Process
import traceback
from logging.handlers import RotatingFileHandler
import time

import argparse

from usergrid import UsergridClient, UsergridError

__author__ = 'Jeff.West@yahoo.com'

logger = logging.getLogger('UsergridIterator')

# SAMPLE CONFIG FILE for source and target
sample_config = {
    "endpoint": {
        "api_url": "https://api.usergrid.com",
        "limit": 100
    },

    "credentials": {
        "myOrg": {
            "client_id": "<<client_id>>",
            "client_secret": "<<client_secret>>"
        }
    }
}


def init_logging(file_enabled=False, stdout_enabled=True):
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.INFO)
    logging.getLogger('urllib3.connectionpool').setLevel(logging.WARN)
    logging.getLogger('requests.packages.urllib3.connectionpool').setLevel(logging.WARN)

    log_formatter = logging.Formatter(fmt='%(asctime)s | %(name)s | %(processName)s | %(levelname)s | %(message)s',
                                      datefmt='%m/%d/%Y %I:%M:%S %p')

    if file_enabled:
        log_file_name = './UsergridIterator.log'

        rotating_file = logging.handlers.RotatingFileHandler(filename=log_file_name,
                                                             mode='a',
                                                             maxBytes=204857600,
                                                             backupCount=10)
        rotating_file.setFormatter(log_formatter)
        rotating_file.setLevel(logging.INFO)

        root_logger.addHandler(rotating_file)

    if stdout_enabled:
        stdout_logger = logging.StreamHandler(sys.stdout)
        stdout_logger.setFormatter(log_formatter)
        stdout_logger.setLevel(logging.INFO)

        root_logger.addHandler(stdout_logger)


config = {}


class Worker(Process):
    """
    The worker is used to perform a set of handler functions in a chain.  Work is provided for the Worker thread(s) on
    a JoinableQueue.  The thread will continue until either 1) it is explicitly terminated or 2) until it does not
     receive work on the queue after a consecutive number of attempts (max_empty_count) using the specified timeout
     (queue_timeout)
    """

    def __init__(self,
                 queue,
                 source_client,
                 target_client,
                 max_empty_count=3,
                 queue_timeout=10,
                 function_chain=None):
        """
        This is an example handler function which can transform an entity. Multiple handler functions can be used to
        process a entity.  The response is an entity which will get passed to the next handler in the chain

        :param queue: The queue on which to listen for work
        :param source_client: The UsergridClient of the source Usergrid instance
        :param target_client: The UsergridClient of the target Usergrid instance
        :param max_empty_count: The maximum number of times for a worker to not receive work after checking the queue
        :param queue_timeout: The timeout for waiting for work on the queue
        :param function_chain: An array of function pointers which will be executed in array sequence, expeting the following parameters: org_name, app_name, collection_name, entity, source_client, target_client, attempts=0p
        """

        super(Worker, self).__init__()
        logger.warning('Creating worker!')

        if not function_chain:
            function_chain = []

        self.function_chain = function_chain
        self.queue = queue
        self.source_client = source_client
        self.target_client = target_client
        self.max_empty_count = max_empty_count
        self.queue_timeout = queue_timeout

    def run(self):
        logger.info('starting run()...')
        keep_going = True

        count_processed = 0
        count_failed = 0
        empty_count = 0

        while keep_going:

            try:
                org, app, collection_name, entity = self.queue.get(timeout=self.queue_timeout)

                empty_count = 0
                success = True
                entity_param = entity

                for handler in self.function_chain:

                    if entity_param is not None:
                        try:
                            entity_param = handler(org, app, collection_name, entity_param, self.source_client,
                                                   self.target_client)
                        except Exception, e:
                            logger.error(e)
                            print traceback.format_exc()
                            success = False

                if success:
                    count_processed += 1
                    logger.info('Processed [%sth] SUCCESS app/collection/name/uuid = %s / %s / %s / %s' % (
                        count_processed, app, collection_name, entity.get('name'), entity.get('uuid')))
                else:
                    count_failed += 1
                    logger.warning('Processed [%sth] FAILURE app/collection/name/uuid = %s / %s / %s / %s' % (
                        count_processed, app, collection_name, entity.get('name'), entity.get('uuid')))

            except KeyboardInterrupt, e:
                raise e

            except Empty:
                logger.warning(
                    'No task received after timeout=[%s]! Empty Count=%s' % (self.queue_timeout, empty_count))

                empty_count += 1

                if empty_count >= self.max_empty_count:
                    logger.warning('Stopping work after empty_count=[%s]' % empty_count)
                    keep_going = False

        logger.info('Worker finished!')


def filter_entity(org_name, app_name, collection_name, entity_data, source_client, target_client, attempts=0):
    """
    This is an example handler function which can filter entities. Multiple handler functions can be used to
    process a entity.  The response is an entity which will get passed to the next handler in the chain

    :param org_name: The org name from whence this entity came
    :param app_name: The app name from whence this entity came
    :param collection_name: The collection name from whence this entity came
    :param entity: The entity retrieved from the source instance
    :param source_client: The UsergridClient for the source Usergrid instance
    :param target_client: The UsergridClient for the target Usergrid instance
    :param attempts: the number of previous attempts this function was run (manual, not part of the framework)
    :return: an entity.  If response is None then the chain will stop.
    """

    # return None if you want to stop the chain (filter the entity out)
    if 'blah' in entity_data:
        return None

    # return the entity to keep going
    return entity_data


def transform_entity(org_name, app_name, collection_name, entity_data, source_client, target_client, attempts=0):
    """
    This is an example handler function which can transform an entity. Multiple handler functions can be used to
    process a entity.  The response is an entity which will get passed to the next handler in the chain

    :param org_name: The org name from whence this entity came
    :param app_name: The app name from whence this entity came
    :param collection_name: The collection name from whence this entity came
    :param entity: The entity retrieved from the source instance
    :param source_client: The UsergridClient for the source Usergrid instance
    :param target_client: The UsergridClient for the target Usergrid instance
    :param attempts: the number of previous attempts this function was run (manual, not part of the framework)
    :return: an entity.  If response is None then the chain will stop.
    """
    # this just returns the entity with no transform
    return entity_data


def create_new(org_name, app_name, collection_name, entity_data, source_client, target_client, attempts=0):
    """
    This is an example handler function which can be used to create a new entity in the target instance (based on the
    target_client) parameter. Multiple handler functions can be used to process a entity.  The response is an entity
    which will get passed to the next handler in the chain

    :param org_name: The org name from whence this entity came
    :param app_name: The app name from whence this entity came
    :param collection_name: The collection name from whence this entity came
    :param entity_data: The entity retrieved from the source instance
    :param source_client: The UsergridClient for the source Usergrid instance
    :param target_client: The UsergridClient for the target Usergrid instance
    :param attempts: the number of previous attempts this function was run (manual, not part of the framework)
    :return: an entity.  If response is None then the chain will stop.
    """

    attempts += 1

    if 'metadata' in entity_data: entity_data.pop('metadata')

    target_org = config.get('target_org')
    target_app = config.get('app_mapping', {}).get(app_name, app_name)
    target_collection = config.get('collection_mapping', {}).get(collection_name, collection_name)

    if target_client:
        try:
            c = target_client.org(target_org).app(target_app).collection(target_collection)
            e = c.entity_from_data(entity_data)
            e.put()

        except UsergridError as err:
            logger.error(err)
            raise err

    return None


def parse_args():
    parser = argparse.ArgumentParser(description='Usergrid App/Collection Iterator')

    parser.add_argument('-o', '--org',
                        help='Name of the org to migrate',
                        type=str,
                        required=True)

    parser.add_argument('-a', '--app',
                        help='Multiple, name of apps to include, skip to include all',
                        default=[],
                        action='append')

    parser.add_argument('-c', '--collection',
                        help='Multiple, name of collections to include, skip to include all',
                        default=[],
                        action='append')

    parser.add_argument('--ql',
                        help='The Query string for processing the source collection(s)',
                        type=str,
                        default='select *')

    parser.add_argument('-s', '--source_config',
                        help='The configuration of the source endpoint/org',
                        type=str,
                        default='source.json')

    parser.add_argument('-d', '--target_config',
                        help='The configuration of the target endpoint/org',
                        type=str,
                        default='destination.json')

    parser.add_argument('-w', '--workers',
                        help='The number of worker threads',
                        type=int,
                        default=1)

    parser.add_argument('-f', '--force',
                        help='Force an update regardless of modified date',
                        type=bool,
                        default=False)

    parser.add_argument('--max_empty_count',
                        help='The number of iterations for an individual worker to receive no work before stopping',
                        type=int,
                        default=3)

    parser.add_argument('--queue_timeout',
                        help='The duration in seconds for an individual worker queue poll before Empty is raised',
                        type=int,
                        default=10)

    parser.add_argument('--map_app',
                        help="A colon-separated string such as 'apples:oranges' which indicates to put data from the app named 'apples' from the source endpoint into app named 'oranges' in the target endpoint",
                        default=[],
                        action='append')

    parser.add_argument('--map_collection',
                        help="A colon-separated string such as 'cats:dogs' which indicates to put data from collections named 'cats' from the source endpoint into a collection named 'dogs' in the target endpoint, applicable to all apps",
                        default=[],
                        action='append')

    parser.add_argument('--target_org',
                        help="The org name at the Usergrid destination instance",
                        type=str)

    my_args = parser.parse_args(sys.argv[1:])

    return vars(my_args)


def init():
    global config

    config['collection_mapping'] = {}
    config['app_mapping'] = {}
    config['org_mapping'] = {}

    with open(config.get('source_config'), 'r') as f:
        config['source_config'] = json.load(f)

    with open(config.get('target_config'), 'r') as f:
        config['target_config'] = json.load(f)

    for mapping in config.get('map_collection', []):
        parts = mapping.split(':')

        if len(parts) == 2:
            config['collection_mapping'][parts[0]] = parts[1]
        else:
            logger.warning('Skipping malformed Collection mapping: [%s]' % mapping)

    for mapping in config.get('map_app', []):
        parts = mapping.split(':')

        if len(parts) == 2:
            config['app_mapping'][parts[0]] = parts[1]
        else:
            logger.warning('Skipping malformed App mapping: [%s]' % mapping)

    for mapping in config.get('map_org', []):
        parts = mapping.split(':')

        if len(parts) == 2:
            config['org_mapping'][parts[0]] = parts[1]
        else:
            logger.warning('Skipping Org mapping: [%s]' % mapping)

    if 'source_config' in config:
        config['source_endpoint'] = config['source_config'].get('endpoint').copy()
        config['source_endpoint'].update(config['source_config']['credentials'][config['org']])

    config['target_org'] = config['target_org'] if config['target_org'] else config['org']

    if 'target_config' in config:
        config['target_endpoint'] = config['target_config'].get('endpoint').copy()
        config['target_endpoint'].update(config['target_config']['credentials'][config['target_org']])


def wait_for(arr_threads, sleep_time=3):
    """
    This function pauses the thread until the array of threads which is provided all stop working

    :param arr_threads: an array of Process objects to monitor
    :param sleep_time: the time to sleep between evaluating the array for completion
    :return: None
    """
    threads_working = 100

    while threads_working > 0:
        threads_working = 0

        for t in arr_threads:

            if t.is_alive():
                threads_working += 1

        if threads_working > 0:
            logger.warn('Waiting for [%s] threads to finish...' % threads_working)
            time.sleep(sleep_time)

    logger.warn('Worker Threads finished!')


class UsergridIterator:
    def __init__(self):
        pass

    def get_to_work(self):
        global config

        queue = Queue()
        logger.warning('Starting workers...')

        apps_to_process = config.get('app')
        collections_to_process = config.get('collection')
        source_org = config['org']
        target_org = config.get('target_org', config.get('org'))

        source_client = None
        target_client = None

        try:
            source_client = UsergridClient(api_url=config['source_endpoint']['api_url'],
                                           org_name=source_org)
            source_client.authenticate_management_client(
                client_credentials=config['source_config']['credentials'][source_org])

        except UsergridError, e:
            logger.critical(e)
            exit()

        if 'target_endpoint' in config:
            try:
                target_client = UsergridClient(api_url=config['target_endpoint']['api_url'],
                                               org_name=target_org)
                target_client.authenticate_management_client(
                    client_credentials=config['target_config']['credentials'][target_org])

            except UsergridError, e:
                logger.critical(e)
                exit()

        function_chain = [filter_entity, transform_entity, create_new]

        workers = [Worker(queue=queue,
                          source_client=source_client,
                          target_client=target_client,
                          function_chain=function_chain,
                          max_empty_count=config.get('max_empty_count', 3),
                          queue_timeout=config.get('queue_timeout', 10))

                   for x in xrange(config.get('workers', 1))]

        [w.start() for w in workers]

        for app in source_client.list_apps():

            if len(apps_to_process) > 0 and app not in apps_to_process:
                logger.warning('Skipping app=[%s]' % app)
                continue

            logger.warning('Processing app=[%s]' % app)

            source_app = source_client.organization(source_org).application(app)

            for collection_name, collection in source_app.list_collections().iteritems():

                if collection_name in ['events', 'queues']:
                    logger.warning('Skipping internal collection=[%s]' % collection_name)
                    continue

                if len(collections_to_process) > 0 and collection_name not in collections_to_process:
                    logger.warning('Skipping collection=[%s]' % collection_name)
                    continue

                logger.warning('Processing collection=%s' % collection_name)

                counter = 0

                try:
                    for entity in collection.query(ql=config.get('ql'),
                                                   limit=config.get('source_endpoint', {}).get('limit', 100)):
                        counter += 1
                        queue.put((config.get('org'), app, collection_name, entity))

                except KeyboardInterrupt:
                    [w.terminate() for w in workers]

            logger.info('Publishing entities complete!')

        wait_for(workers)

        logger.info('All done!!')


def main():
    global config
    config = parse_args()
    init()

    init_logging()

    UsergridIterator().get_to_work()


if __name__ == '__main__':
    main()
