blob: 8b1aae69f034b1208fcfd1d87a9b6725c0b51628 [file] [log] [blame]
# -*- coding: utf-8 -*-
# */
# * Licensed to the Apache Software Foundation (ASF) under one
# * or more contributor license agreements. See the NOTICE file
# * distributed with this work for additional information
# * regarding copyright ownership. The ASF licenses this file
# * to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# * with the License. You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing,
# * software distributed under the License is distributed on an
# * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# * KIND, either express or implied. See the License for the
# * specific language governing permissions and limitations
# * under the License.
# */
import json
import logging
import traceback
from multiprocessing import Pool
import datetime
import socket
import argparse
import requests
import time
from logging.handlers import RotatingFileHandler
import sys
__author__ = 'Jeff.West@yahoo.com'
entity_template = {
"id": "replaced",
"dataType": "entitlements",
"mockData": [
{"importDate": "2015-08-25T23:33:57.124Z", "rowsImported": 2},
{"role": "line-owner", "route": "/master", "element": "element1", "entitlementId": "entitlement4",
"property": "show"},
{"role": "line-owner", "route": "/master", "element": "element2", "entitlementId": "entitlement8",
"property": "hide"}
],
"nullArray1": [None],
"nullArray2": [None, None],
"nullArray3": [None, None],
"nest1": {
"nest2": {
"nest3": [None, None, 'foo']
}
}
}
entity_template = {
"type": "customerstatuses",
"created": 1454769737888,
"modified": 1454781811473,
"address": {
"zip": "35873",
"city": "málaga",
"street": "3430 calle de bravo murillo",
"state": "melilla"
},
"DOB": "787264244",
"email": "begoña.caballero29@example.com",
"firstName": "Begoña",
"lastName": "Caballero",
"lastSeenDateTime": 1447737158857,
"locationStatus": "Entrance",
"loyaltyAccountNumber": "1234",
"loyaltyLevel": "basic",
"phone": "966-450-469",
"profilePictureUrl": "http://api.randomuser.me/portraits/thumb/women/61.jpg",
"status": "Entrance",
"storeId": 12121
}
url_template = '{api_url}/{org}/{app}/{collection}'
token_url_template = '{api_url}/{org}/{app}/token'
config = {}
session = requests.Session()
logger = logging.getLogger('UsergridBatchIndexTest')
def init_logging(stdout_enabled=True):
root_logger = logging.getLogger()
log_file_name = './usergrid_index_test.log'
log_formatter = logging.Formatter(fmt='%(asctime)s | %(name)s | %(processName)s | %(levelname)s | %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p')
rotating_file = logging.handlers.RotatingFileHandler(filename=log_file_name,
mode='a',
maxBytes=2048576000,
backupCount=10)
rotating_file.setFormatter(log_formatter)
rotating_file.setLevel(logging.INFO)
root_logger.addHandler(rotating_file)
root_logger.setLevel(logging.INFO)
logging.getLogger('urllib3.connectionpool').setLevel(logging.WARN)
logging.getLogger('requests.packages.urllib3.connectionpool').setLevel(logging.WARN)
if stdout_enabled:
stdout_logger = logging.StreamHandler(sys.stdout)
stdout_logger.setFormatter(log_formatter)
stdout_logger.setLevel(logging.INFO)
root_logger.addHandler(stdout_logger)
def create_entity(work_item):
global config
try:
url = work_item[0]
entity = work_item[1]
# entity['name'] = datetime.datetime.now().strftime('name-%yx%mx%dx%Hx%Mx%S')
logger.info('creating entity [%s] at URL [%s]' % (entity.get('id'), url))
r = session.post(url, data=json.dumps(entity))
if r.status_code != 200:
logger.error('HTTP %s: %s' % (r.status_code, r.text))
print 'HTTP %s: %s' % (r.status_code, r.text)
return
entities = r.json().get('entities', [])
uuid = entities[0].get('uuid')
if r.status_code != 200:
logger.info('%s: %s' % (r.status_code, uuid))
else:
logger.info('Created entity UUID=[%s] at URL [%s]' % (uuid, url))
return uuid, entity
except Exception, e:
print traceback.format_exc(e)
def test_multiple(number_of_entities):
global config
start = datetime.datetime.now()
logger.info('Creating %s entities w/ url=%s' % (number_of_entities, config['url']))
created_map = {}
work_items = []
for x in xrange(1, number_of_entities + 1):
entity = entity_template.copy()
entity['id'] = str(x)
work_items.append((config['url'], entity))
responses = processes.map(create_entity, work_items)
for res in responses:
if len(res) > 0:
created_map[res[0]] = res[1]
stop = datetime.datetime.now()
logger.info('Created [%s] entities in %s' % (number_of_entities, (stop - start)))
return created_map
def wait_for_indexing(created_map, q_url, sleep_time=0.0):
logger.info('Waiting for indexing of [%s] entities...' % len(created_map))
count_missing = 100
start_time = datetime.datetime.now()
while count_missing > 0:
entity_map = {}
r = session.get(q_url)
res = r.json()
entities = res.get('entities', [])
now_time = datetime.datetime.now()
elapsed = now_time - start_time
logger.info('Found [%s] of [%s] ([%s] missing) after [%s] entities at url: %s' % (
len(entities), len(created_map), (len(created_map) - len(entities)), elapsed, q_url))
count_missing = 0
for entity in entities:
entity_map[entity.get('uuid')] = entity
for uuid, created_entity in created_map.iteritems():
if uuid not in entity_map:
count_missing += 1
logger.info('Missing uuid=[%s] Id=[%s] total missing=[%s]' % (
uuid, created_entity.get('id'), count_missing))
if count_missing > 0:
logger.info('Waiting for indexing, count_missing=[%s] Total time [%s] Sleeping for [%s]s' % (
elapsed, count_missing, sleep_time))
time.sleep(sleep_time)
stop_time = datetime.datetime.now()
logger.info('All entities found after %s' % (stop_time - start_time))
def clear(clear_url):
logger.info('deleting.... ' + clear_url)
r = session.delete(clear_url)
if r.status_code != 200:
logger.info('error deleting url=' + clear_url)
logger.info(json.dumps(r.json()))
else:
res = r.json()
len_entities = len(res.get('entities', []))
if len_entities > 0:
clear(clear_url)
def test_cleared(q_url):
r = session.get(q_url)
if r.status_code != 200:
logger.info(json.dumps(r.json()))
else:
res = r.json()
if len(res.get('entities', [])) != 0:
logger.info('DID NOT CLEAR')
processes = Pool(32)
def test_url(q_url, sleep_time=0.25):
test_var = False
while not test_var:
r = session.get(q_url)
if r.status_code == 200:
if len(r.json().get('entities')) >= 1:
test_var = True
else:
logger.info('non 200')
if test_var:
logger.info('Test of URL [%s] Passes')
else:
logger.info('Test of URL [%s] Passes')
time.sleep(sleep_time)
def parse_args():
parser = argparse.ArgumentParser(description='Usergrid Indexing Latency Test')
parser.add_argument('-o', '--org',
help='Name of the org to perform the test in',
type=str,
required=True)
parser.add_argument('-a', '--app',
help='Name of the app to perform the test in',
type=str,
required=True)
parser.add_argument('--base_url',
help='The URL of the Usergrid Instance',
type=str,
required=True)
parser.add_argument('--client_id',
help='The Client ID to get a token, if needed',
type=str,
required=False)
parser.add_argument('--client_secret',
help='The Client Secret to get a token, if needed',
type=str,
required=False)
my_args = parser.parse_args(sys.argv[1:])
return vars(my_args)
def init():
global config
url_data = {
'api_url': config.get('base_url'),
'org': config.get('org'),
'app': config.get('app'),
'collection': '%s-%s' % (socket.gethostname(), datetime.datetime.now().strftime('index-test-%yx%mx%dx%Hx%Mx%S'))
}
config['url'] = url_template.format(**url_data)
config['token_url'] = token_url_template.format(**url_data)
def main():
global config
# processes = Pool(32)
config = parse_args()
init()
init_logging()
if config.get('client_id') is not None and config.get('client_secret') is not None:
token_request = {
'grant_type': 'client_credentials',
'client_id': config.get('client_id'),
'client_secret': config.get('client_secret')
}
r = session.post(config.get('token_url'), json.dumps(token_request))
if r.status_code == 200:
access_token = r.json().get('access_token')
session.headers.update({'Authorization': 'Bearer %s' % access_token})
else:
logger.critical('unable to get token: %s' % r.text)
exit(1)
try:
created_map = test_multiple(999)
q_url = config.get('url') + "?ql=select * where dataType='entitlements'&limit=1000"
wait_for_indexing(created_map=created_map,
q_url=q_url,
sleep_time=1)
delete_q_url = config.get('url') + "?ql=select * where dataType='entitlements'&limit=1000"
clear(clear_url=delete_q_url)
except KeyboardInterrupt:
processes.terminate()
processes.terminate()
main()