| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pirk.responder.wideskies; |
| |
| import java.util.Arrays; |
| import java.util.List; |
| |
| import org.apache.pirk.inputformat.hadoop.InputFormatConst; |
| import org.apache.pirk.schema.data.DataSchemaLoader; |
| import org.apache.pirk.schema.query.QuerySchemaLoader; |
| import org.apache.pirk.utils.SystemConfiguration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Properties constants and validation for the Responder |
| */ |
| public class ResponderProps |
| { |
| private static final Logger logger = LoggerFactory.getLogger(ResponderDriver.class); |
| |
| // Required properties |
| public static final String PLATFORM = "platform"; |
| public static final String QUERYINPUT = "pir.queryInput"; |
| public static final String DATAINPUTFORMAT = "pir.dataInputFormat"; |
| public static final String OUTPUTFILE = "pir.outputFile"; |
| |
| // Optional properties |
| public static final String INPUTDATA = "pir.inputData"; |
| public static final String BASEQUERY = "pir.baseQuery"; |
| public static final String ESRESOURCE = "pir.esResource"; |
| public static final String ESQUERY = "pir.esQuery"; |
| public static final String BASEINPUTFORMAT = "pir.baseInputFormat"; |
| public static final String STOPLISTFILE = "pir.stopListFile"; |
| public static final String QUERYSCHEMAS = "responder.querySchemas"; |
| public static final String DATASCHEMAS = "responder.dataSchemas"; |
| public static final String NUMEXPLOOKUPPARTS = "pir.numExpLookupPartitions"; |
| public static final String USELOCALCACHE = "pir.useLocalCache"; |
| public static final String LIMITHITSPERSELECTOR = "pir.limitHitsPerSelector"; |
| public static final String MAXHITSPERSELECTOR = "pir.maxHitsPerSelector"; |
| public static final String NUMCOLMULTPARTITIONS = "pir.numColMultPartitions"; |
| public static final String USEMODEXPJOIN = "pir.useModExpJoin"; |
| public static final String COLMULTREDUCEBYKEY = "pir.colMultReduceByKey"; |
| static final String NUMREDUCETASKS = "pir.numReduceTasks"; |
| static final String MAPMEMORY = "mapreduce.map.memory.mb"; |
| static final String REDUCEMEMORY = "mapreduce.reduce.memory.mb"; |
| static final String MAPJAVAOPTS = "mapreduce.map.java.opts"; |
| static final String REDUCEJAVAOPTS = "mapreduce.reduce.java.opts"; |
| static final String USEHDFSLOOKUPTABLE = "pir.useHDFSLookupTable"; |
| static final String NUMDATAPARTITIONS = "pir.numDataPartitions"; |
| static final String ALLOWEMBEDDEDQUERYSCHEMAS = "pir.allowEmbeddedQuerySchemas"; |
| |
| static final List<String> PROPSLIST = Arrays.asList(PLATFORM, QUERYINPUT, DATAINPUTFORMAT, INPUTDATA, BASEQUERY, ESRESOURCE, ESQUERY, OUTPUTFILE, |
| BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, REDUCEMEMORY, MAPJAVAOPTS, |
| REDUCEJAVAOPTS, QUERYSCHEMAS, DATASCHEMAS, NUMEXPLOOKUPPARTS, USEHDFSLOOKUPTABLE, NUMDATAPARTITIONS, NUMCOLMULTPARTITIONS, USEMODEXPJOIN, |
| COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS); |
| |
| /** |
| * Validates the responder properties |
| * |
| */ |
| static boolean validateResponderProperties() |
| { |
| boolean valid = true; |
| |
| // Parse general required options |
| |
| if (!SystemConfiguration.hasProperty(PLATFORM)) |
| { |
| logger.info("Must have the option " + PLATFORM); |
| valid = false; |
| } |
| |
| String platform = SystemConfiguration.getProperty(PLATFORM).toLowerCase(); |
| if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("standalone")) |
| { |
| logger.info("Unsupported platform: " + platform); |
| valid = false; |
| } |
| |
| if (!SystemConfiguration.hasProperty(QUERYINPUT)) |
| { |
| logger.info("Must have the option " + QUERYINPUT); |
| valid = false; |
| } |
| |
| if (!SystemConfiguration.hasProperty(OUTPUTFILE)) |
| { |
| logger.info("Must have the option " + OUTPUTFILE); |
| valid = false; |
| } |
| |
| if (!SystemConfiguration.hasProperty(DATAINPUTFORMAT)) |
| { |
| logger.info("Must have the option " + DATAINPUTFORMAT); |
| valid = false; |
| } |
| String dataInputFormat = SystemConfiguration.getProperty(DATAINPUTFORMAT).toLowerCase(); |
| |
| // Parse required properties by dataInputFormat |
| |
| if (dataInputFormat.equals(InputFormatConst.BASE_FORMAT)) |
| { |
| if (!SystemConfiguration.hasProperty(BASEINPUTFORMAT)) |
| { |
| logger.info("For base inputformt: Must have the option " + BASEINPUTFORMAT + " if using " + InputFormatConst.BASE_FORMAT); |
| valid = false; |
| } |
| |
| if (!SystemConfiguration.hasProperty(INPUTDATA)) |
| { |
| logger.info("For base inputformt: Must have the option " + INPUTDATA + " if using " + InputFormatConst.BASE_FORMAT); |
| valid = false; |
| } |
| |
| if (!SystemConfiguration.hasProperty(BASEQUERY)) |
| { |
| SystemConfiguration.setProperty("BASEQUERY", "?q=*"); |
| } |
| } |
| else if (dataInputFormat.equals(InputFormatConst.ES)) |
| { |
| if (!SystemConfiguration.hasProperty(ESRESOURCE)) |
| { |
| logger.info("For ElasticSearch inputformt: Must have the option " + ESRESOURCE); |
| valid = false; |
| } |
| |
| if (!SystemConfiguration.hasProperty(ESQUERY)) |
| { |
| logger.info("For ElasticSearch inputformat: Must have the option " + ESQUERY); |
| valid = false; |
| } |
| } |
| else if (dataInputFormat.equalsIgnoreCase("standalone")) |
| { |
| if (!SystemConfiguration.hasProperty(INPUTDATA)) |
| { |
| logger.info("Must have the option " + INPUTDATA + " if using " + InputFormatConst.BASE_FORMAT); |
| valid = false; |
| } |
| } |
| else |
| { |
| logger.info("Unsupported inputFormat = " + dataInputFormat); |
| valid = false; |
| } |
| |
| // Parse optional properties with defaults |
| |
| if (SystemConfiguration.hasProperty(QUERYSCHEMAS)) |
| { |
| SystemConfiguration.appendProperty("query.schemas", SystemConfiguration.getProperty(QUERYSCHEMAS)); |
| } |
| |
| if (SystemConfiguration.hasProperty(DATASCHEMAS)) |
| { |
| SystemConfiguration.appendProperty("data.schemas", SystemConfiguration.getProperty(DATASCHEMAS)); |
| } |
| |
| if (!SystemConfiguration.hasProperty(USEHDFSLOOKUPTABLE)) |
| { |
| SystemConfiguration.setProperty(USEHDFSLOOKUPTABLE, "false"); |
| } |
| |
| if (!SystemConfiguration.hasProperty(USEMODEXPJOIN)) |
| { |
| SystemConfiguration.setProperty(USEMODEXPJOIN, "false"); |
| } |
| |
| if (!SystemConfiguration.hasProperty(NUMDATAPARTITIONS)) |
| { |
| SystemConfiguration.setProperty(NUMDATAPARTITIONS, "1000"); |
| } |
| |
| if (!SystemConfiguration.hasProperty(NUMCOLMULTPARTITIONS)) |
| { |
| SystemConfiguration.setProperty(NUMCOLMULTPARTITIONS, "1000"); |
| } |
| |
| if (!SystemConfiguration.hasProperty(COLMULTREDUCEBYKEY)) |
| { |
| SystemConfiguration.setProperty(COLMULTREDUCEBYKEY, "false"); |
| } |
| |
| if (!SystemConfiguration.hasProperty(ALLOWEMBEDDEDQUERYSCHEMAS)) |
| { |
| SystemConfiguration.setProperty(ALLOWEMBEDDEDQUERYSCHEMAS, "false"); |
| } |
| |
| if (!SystemConfiguration.hasProperty(USELOCALCACHE)) |
| { |
| SystemConfiguration.setProperty(USELOCALCACHE, "true"); |
| } |
| |
| // Load the new local query and data schemas |
| if (valid) |
| { |
| logger.info("loading schemas: dataSchemas = " + SystemConfiguration.getProperty("data.schemas") + " querySchemas = " |
| + SystemConfiguration.getProperty("query.schemas")); |
| try |
| { |
| DataSchemaLoader.initialize(); |
| QuerySchemaLoader.initialize(); |
| |
| } catch (Exception e) |
| { |
| e.printStackTrace(); |
| } |
| } |
| |
| return valid; |
| } |
| } |