blob: f13ccc245c4f2674e21168d05bcb30b3f93cba7e [file] [log] [blame]
# */
# * Licensed to the Apache Software Foundation (ASF) under one
# * or more contributor license agreements. See the NOTICE file
# * distributed with this work for additional information
# * regarding copyright ownership. The ASF licenses this file
# * to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# * with the License. You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing,
# * software distributed under the License is distributed on an
# * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# * KIND, either express or implied. See the License for the
# * specific language governing permissions and limitations
# * under the License.
# */
from __future__ import print_function
from Queue import Empty
import json
from multiprocessing import JoinableQueue, Process
import random
import re
import uuid
import sys
import argparse
import loremipsum
__author__ = 'Jeff.West@yahoo.com'
def parse_args():
parser = argparse.ArgumentParser(description='ElasticSearch Index Test 1')
parser.add_argument('-w', '--workers',
help='The number of worker threads',
type=int,
default=8)
parser.add_argument('-dc', '--document_count',
help='The number of documents per index',
type=long,
default=100000000)
parser.add_argument('--output',
help='The filename to write to',
type=str,
default='generated_documents.txt')
parser.add_argument('--fields_min',
help='The min number of fields per document',
type=long,
default=10)
parser.add_argument('--fields_max',
help='The max number of fields per document',
type=long,
default=100)
parser.add_argument('-tp', '--type_prefix',
help='The Prefix to use for type names',
type=str,
default='type_this')
my_args = parser.parse_args(sys.argv[1:])
return vars(my_args)
args = parse_args()
sentence_list = loremipsum.get_sentences(10000)
class Worker(Process):
def __init__(self, work_queue, response_queue):
super(Worker, self).__init__()
self.work_queue = work_queue
self.response_queue = response_queue
self.sentence_list = loremipsum.get_sentences(1000)
self.re_first_word = re.compile('([A-z]+)')
def run(self):
print('Starting %s ' % self.name)
while True:
task = self.work_queue.get(timeout=600)
field_count = random.randint(task['fields_min'], task['fields_max'])
document = self.generate_document(field_count)
flattened_doc = self.process_document(document,
task['uuid'],
task['uuid'])
self.response_queue.put(flattened_doc)
self.work_queue.task_done()
def generate_document(self, fields):
doc = {}
my_bool = True
for i in xrange(fields):
sentence_index = random.randint(0, max((fields / 2) - 1, 1))
sentence = self.sentence_list[sentence_index]
if random.random() >= .5:
key = self.re_first_word.findall(sentence)[1]
else:
key = self.re_first_word.findall(sentence)[1] + str(i)
field_type = random.random()
if field_type <= 0.3:
doc[key] = sentence
elif field_type <= 0.5:
doc[key] = random.randint(1, 1000000)
elif field_type <= 0.6:
doc[key] = random.random() * 1000000000
elif field_type == 0.7:
doc[key] = my_bool
my_bool = not my_bool
elif field_type == 0.8:
doc[key] = self.generate_document(max(fields / 5, 1))
elif field_type <= 1.0:
doc['mylocation'] = self.generate_location()
return doc
@staticmethod
def get_fields(document, base_name=None):
fields = []
for name, value in document.iteritems():
if base_name:
field_name = '%s.%s' % (base_name, name)
else:
field_name = name
if isinstance(value, dict):
fields += Worker.get_fields(value, field_name)
else:
value_name = None
if isinstance(value, basestring):
value_name = 'string'
elif isinstance(value, bool):
value_name = 'boolean'
elif isinstance(value, (int, long)):
value_name = 'long'
elif isinstance(value, float):
value_name = 'double'
if value_name:
field = {
'name': field_name,
value_name: value
}
else:
field = {
'name': field_name,
'string': str(value)
}
fields.append(field)
return fields
@staticmethod
def process_document(document, application_id, uuid):
response = {
'entityId': uuid,
'entityVersion': '1',
'applicationId': application_id,
'fields': Worker.get_fields(document)
}
return response
def generate_location(self):
response = {}
lat = random.random() * 90.0
lon = random.random() * 180.0
lat_neg_true = True if lon > .5 else False
lon_neg_true = True if lat > .5 else False
lat = lat * -1.0 if lat_neg_true else lat
lon = lon * -1.0 if lon_neg_true else lon
response['location'] = {
'lat': lat,
'lon': lon
}
return response
class Writer(Process):
def __init__(self, document_queue):
super(Writer, self).__init__()
self.document_queue = document_queue
def run(self):
keep_going = True
with open(args['output'], 'w') as f:
while keep_going:
try:
document = self.document_queue.get(timeout=300)
print(json.dumps(document), file=f)
except Empty:
print('done!')
keep_going = False
def total_milliseconds(td):
return (td.microseconds + td.seconds * 1000000) / 1000
def main():
work_queue = JoinableQueue()
response_queue = JoinableQueue()
workers = [Worker(work_queue, response_queue) for x in xrange(args.get('workers'))]
writer = Writer(response_queue)
writer.start()
[worker.start() for worker in workers]
try:
total_messages = args.get('document_count')
batch_size = 100000
message_counter = 0
for doc_number in xrange(total_messages):
message_counter += 1
for count in xrange(batch_size):
doc_id = str(uuid.uuid1())
task = {
'fields_min': args['fields_min'],
'fields_max': args['fields_max'],
'uuid': doc_id
}
work_queue.put(task)
print('Joining queues counter=[%s]...' % message_counter)
work_queue.join()
response_queue.join()
print('Done queue counter=[%s]...' % message_counter)
except KeyboardInterrupt:
[worker.terminate() for worker in workers]
main()