blob: a26b03ca96c9e7da32156443bdb42c37d22e4d1b [file] [log] [blame]
#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import md5
import time
import ConnectorHelpers
from sqatools import LicenseMakerClient
from sqatools import appliance
import ZPF_Filter
sys.path.append("/usr/lib/metacarta")
import MetaCartaVersion
# Copy a folder to a (new) area
def copy_folder( source, target ):
appliance.spcall( [ "mkdir", "-p", target ] )
appliance.spcall( [ "cp", "-r", source, target ] )
# Remove a folder
def delete_folder( target ):
appliance.spcall( [ "rm", "-rf", target ] )
# Crawl user credentials
username = "testingest"
password = "testingest"
def save_local_files():
""" Generate files by replicating 10 test files 650,000 times """
# We need 6,500,000 documents. We'll get this by
# having 10 documents under 6 levels of a hierarchy, and truncating when we reach the proper count
filecount = 0
max_file_count = 6500000
level0 = 0
while level0 < 10 and filecount < max_file_count:
level1 = 0
while level1 < 10 and filecount < max_file_count:
level2 = 0
while level2 < 10 and filecount < max_file_count:
level3 = 0
while level3 < 10 and filecount < max_file_count:
level4 = 0
while level4 < 10 and filecount < max_file_count:
level5 = 0
while level5 < 10 and filecount < max_file_count:
pathname = "/common/crawlarea/%d/%d/%d/%d/%d/%d" % (level0,level1,level2,level3,level4,level5)
copy_folder("/root/largefiles",pathname)
filecount += 10
level5 += 1
level4 += 1
level3 += 1
level2 += 1
level1 += 1
level0 += 1
def convert_url_to_filename( url ):
""" Convert a url to a hopefully-unique document name """
# Because of length limitations, instead of character stuffing I had
# to use a well-formed hash hex value
return md5.new(url).hexdigest()
def get_path_from_filename( filename ):
""" Get a two-level path from a filename """
first_level = filename[len(filename)-2:len(filename)]
second_level = filename[len(filename)-4:len(filename)-2]
return "%s/%s" % (first_level,second_level)
def save_file(doc_contents, doc_directory, doc_url):
""" Save a document that came in from a zpf.
The document should be put into the specified directory, and
the filename of the document must be based on the specified url.
"""
file_name = convert_url_to_filename(doc_url)
directory_path = "%s/%s" % (doc_directory,get_path_from_filename(file_name))
# Make sure path exists
try:
os.makedirs(directory_path)
except OSError,e:
pass
file_name = "%s/%s.htm" % (directory_path,file_name)
# Are these binary? No harm in presuming that...
fhandle = open(file_name,"wb")
try:
fhandle.write(doc_contents)
finally:
fhandle.close()
class ZPF_save(ZPF_Filter.ZPF_filter):
def handle_metadata(self, zpfin, zpfout, str, vals, mvals):
self.url = vals.group("url")
# we don't really need creation time
return 1
def handle_doc(self, zpfin, zpfout, doclen, flags):
""" Takes the input and output strings, a length, and flags that we can ignore """
doc = zpfin.read(doclen)
if len(doc):
save_file(doc, self.directory, self.url)
def __init__(self, directory):
self.directory = directory
def save_zpf_pipe_files():
""" Generate files by receiving them over stdin as a zpf """
# cheat a bit - gunzip will inherit our stdin, we just want stdout...
zfile = os.popen("gunzip", "r")
z = ZPF_save("/common/crawlarea")
z.scan_zpf(zfile,None)
def preclean( print_errors=True ):
''' Clean up everything we might have done during the execution of this test.
This will include all jobs and ingested documents. '''
try:
ConnectorHelpers.start_leafblower()
except Exception, e:
if print_errors:
print "Error restarting ingestion"
print e
try:
ConnectorHelpers.reset_all()
except Exception, e:
if print_errors:
print "Error resetting all jobs"
print e
# Remove test documents first
for folder in [ "/common/crawlarea" ]:
try:
delete_folder( folder )
except Exception, e:
if print_errors:
print "Error removing %s" % folder
print e
try:
LicenseMakerClient.revoke_license
except Exception, e:
if print_errors:
print "Error revoking license"
print e
try:
ConnectorHelpers.delete_crawler_user( username )
except Exception, e:
if print_errors:
print "Error removing crawler user"
print e
try:
ConnectorHelpers.teardown_connector_environment( )
except Exception, e:
if print_errors:
print "Error cleaning up debs"
print e
# Main
if __name__ == '__main__':
# Look for a mode to indicate whether to populate our own data, or get it passed to us in a zpf
mode = "local"
if len(sys.argv) > 1:
# Mode is "local" or "zpf-pipe"
mode = sys.argv[1]
print "Precleaning!"
preclean( print_errors=False )
print "Setting up file area."
if mode == "local":
save_local_files()
elif mode == "zpf-pipe":
# The documents will be piped to us as a zpf over stdin; use techniques borrowed from bulk_zpf_ingestion.py accordingly
os.mkdir("/common/crawlarea")
save_zpf_pipe_files()
else:
raise Exception("Illegal test operating mode: %s" % mode)
print "Setup Connector Environment."
ConnectorHelpers.setup_connector_environment()
print "Creating crawler user."
ConnectorHelpers.create_crawler_user( username, password )
ConnectorHelpers.define_gts_outputconnection( )
# Before we begin, do postgres maintenance to be sure we start in the same place each time
ConnectorHelpers.run_maintenance()
# PHASE 1: Ingestion
print "Scale Test."
# Define repository connection
ConnectorHelpers.define_filesystem_repository_connection_ui( username, password,
"FileSystem",
"FileSystem Connection",
max_connections=100 )
# Define job
job_id = ConnectorHelpers.define_filesystem_job_ui( username, password,
"Test job",
"FileSystem",
[ ("/common/crawlarea", [( "include", "file", "*.htm" ), ( "include", "directory", "*" )]) ],
hop_mode="neverdelete" )
# Run the job
ConnectorHelpers.start_job( job_id )
# The test requires about 14 days to run. We need to do postgres maintenance during that
# time to keep performance up to standards. I've chosen to do it every 3 days...
while True:
abort = False
interval_begin = time.time()
while time.time() - interval_begin < 259200.0:
time.sleep(600)
status = ConnectorHelpers.get_job_status_ui( username, password, job_id )
if status != "Starting up" and status != "Running":
abort = True
break
if abort:
break
ConnectorHelpers.run_maintenance()
# Now, let job complete
ConnectorHelpers.wait_job_complete( job_id )
# Success: done
print "Done ingestion scale test."
# PHASE 2: Cleanup test
print "Cleanup scale Test."
ConnectorHelpers.delete_job( job_id )
print "...job delete request sent"
ConnectorHelpers.wait_job_deleted( job_id )
print "...job has vanished"
print "Done Cleanup Scale Test."
ConnectorHelpers.delete_repositoryconnection( "FileSystem" )
ConnectorHelpers.delete_gts_outputconnection( )
ConnectorHelpers.delete_crawler_user( username )
delete_folder("/common/crawlarea")
LicenseMakerClient.revoke_license
ConnectorHelpers.teardown_connector_environment( )
print "Scale ConnectorFramework test PASSED"