blob: 335b567f683e556cd28c02858eaf80fc982477d7 [file] [log] [blame]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# $Id$
#
# Author: mattmann
# Description: Uses Apache Solr based catalog of code repo to partition,
# based on MIME type the distributed, map-reduce like execution of Apache
# RAT, so that RAT can complete with a reasonable amount of time.
import sys
import json
import getopt
import urllib
from urllib.request import urlopen, Request
from xmlrpc import client
#urllib.request.build_opener(urllib.HTTPHandler(debuglevel=1))
solrPostfix = "/select/?q=mimetype:$type&version=2.2&start=0&rows=10&indent=on&facet=on&facet.field=mimetype&wt=json&fl=filelocation,filename"
solrPostfixByPage = "/select/?q=mimetype:$type&version=2.2&start=$i&rows=$num&indent=on&facet=on&facet.field=mimetype&wt=json&fl=filelocation,filename"
def executeRatJobs(url, num, type, workflowUrl, taskIds):
# for i = 0 to count(records) i+=num
# json = query Solr with solrPostFix.replace($type, type)
# build filepath list
# send filepath and name list to CAS-PGE RAT job
# first get numRecords
if not url.endswith("/"):
url = url + "/"
solrUrl = url+solrPostfix.replace("$type", type)
print("GET "+solrUrl)
numFound = 0
req = Request(solrUrl)
try:
f = urlopen(req)
jsonResp = json.loads(f.read().decode('utf-8'))
numFound = int(jsonResp["response"]["numFound"])
except urllib.error.HTTPError as err:
print("HTTP error(%s)" % (err))
print("Aborting RAT execution")
return
wm = client.Server(workflowUrl)
for i in range(0, numFound, num):
ratSolrUrl = url + solrPostfixByPage.replace("$type", type).replace("$i", str(i)).replace("$num",str(num))
req = Request(ratSolrUrl)
f = urlopen(req)
jsonResp = json.loads(f.read().decode('utf-8'))
docs = jsonResp["response"]["docs"]
metadata = {}
metadata["MimeType"] = type
for doc in docs:
filename = doc["filename"][0]
filelocation = doc["filelocation"][0]
fullpath = None
if not filelocation.endswith("/"):
filelocation = filelocation + "/"
fullpath = filelocation + filename
if "InputFiles" not in metadata:
metadata["InputFiles"] = []
metadata["InputFiles"].append(fullpath)
print("Metadata is "+str(metadata))
wm.workflowmgr.executeDynamicWorkflow([taskIds], metadata)
def get_mime_types(solrUrl):
neg_mimetype = ["image", "application", "text", "video", "audio", "message", "multipart"]
connection = urlopen(solrUrl + "/select?q=*%3A*&rows=0&facet=true&facet.field=mimetype&wt=python&indent=true")
response = eval(connection.read())
mime_count = response["facet_counts"]["facet_fields"]["mimetype"]
stats = {}
for i in range(0, len(mime_count), 2):
if mime_count[i].split("/")[0] not in neg_mimetype:
stats[mime_count[i]] = mime_count[i + 1]
return stats.keys()
def main(argv):
solrUrl=''
numFilesPerJob=0
workflowUrl=''
ratTaskId=''
usage = 'mime_rat_partitioner.py -u <solrUrl> -c <numFilesPerJob> -w <workflowUrl> -t <rat task Id>'
try:
opts, args = getopt.getopt(argv,"hu:c:w:t:",["solrUrl=", "numFilesPerJob=", "workflowUrl=", "ratTaskId="])
except getopt.GetoptError:
print(usage)
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print(usage)
sys.exit()
elif opt in ("-u", "--solrUrl"):
solrUrl = arg
elif opt in ("-c", "--numFilesPerJob"):
numFilesPerJob = int(arg)
elif opt in ("-w", "--workflowUrl"):
workflowUrl = arg
elif opt in ("-t", "--ratTaskId"):
ratTaskId = arg
if solrUrl == "" or numFilesPerJob == 0 or workflowUrl == "" or ratTaskId == "":
print(usage)
sys.exit()
print("Configured SOLR url: ["+solrUrl+"]")
mimeTypes = get_mime_types(solrUrl)
for type in mimeTypes:
print("Executing RAT for MIME: ["+type+"]: num files per job: ["+str(numFilesPerJob)+"]")
executeRatJobs(solrUrl, numFilesPerJob, type, workflowUrl, ratTaskId)
if __name__ == "__main__":
main(sys.argv[1:])