blob: 6584a37a77af544e487430be957139f8f76a741e [file] [log] [blame]
#!/usr/bin/env ambari-python-wrap
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import traceback
import imp
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
STACKS_DIR = os.path.join(SCRIPT_DIR, '../../../../../stacks/')
PARENT_FILE = os.path.join(STACKS_DIR, 'service_advisor.py')
try:
with open(PARENT_FILE, 'rb') as fp:
service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, ('.py', 'rb', imp.PY_SOURCE))
except Exception as e:
traceback.print_exc()
print "Failed to load parent"
class METRON${metron.short.version}ServiceAdvisor(service_advisor.ServiceAdvisor):
def getServiceComponentLayoutValidations(self, services, hosts):
componentsListList = [service["components"] for service in services["services"]]
componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist]
metronParsersHost = self.getHosts(componentsList, "METRON_PARSERS")[0]
metronEnrichmentMaster = self.getHosts(componentsList, "METRON_ENRICHMENT_MASTER")[0]
metronProfilerHost = self.getHosts(componentsList, "METRON_PROFILER")[0]
metronPcapHost = self.getHosts(componentsList, "METRON_PCAP")[0]
metronIndexingHost = self.getHosts(componentsList, "METRON_INDEXING")[0]
metronRESTHost = self.getHosts(componentsList, "METRON_REST")[0]
metronManagementUIHost = self.getHosts(componentsList, "METRON_MANAGEMENT_UI")[0]
metronAlertsUIHost = self.getHosts(componentsList, "METRON_ALERTS_UI")[0]
hbaseClientHosts = self.getHosts(componentsList, "HBASE_CLIENT")
hdfsClientHosts = self.getHosts(componentsList, "HDFS_CLIENT")
zookeeperClientHosts = self.getHosts(componentsList, "ZOOKEEPER_CLIENT")
kafkaBrokers = self.getHosts(componentsList, "KAFKA_BROKER")
stormSupervisors = self.getHosts(componentsList, "SUPERVISOR")
items = []
# Metron Must Co-locate with KAFKA_BROKER and STORM_SUPERVISOR
if metronParsersHost not in kafkaBrokers:
message = "Metron must be colocated with an instance of KAFKA BROKER"
items.append({ "type": 'host-component', "level": 'ERROR', "message": message, "component-name": 'METRON_PARSERS', "host": metronParsersHost })
if metronParsersHost not in stormSupervisors:
message = "Metron must be colocated with an instance of STORM SUPERVISOR"
items.append({ "type": 'host-component', "level": 'WARN', "message": message, "component-name": 'METRON_PARSERS', "host": metronParsersHost })
if metronRESTHost not in stormSupervisors:
message = "Metron REST must be colocated with an instance of STORM SUPERVISOR"
items.append({ "type": 'host-component', "level": 'WARN', "message": message, "component-name": 'METRON_REST', "host": metronRESTHost })
if metronParsersHost != metronRESTHost:
message = "Metron REST must be co-located with Metron Parsers on {0}".format(metronParsersHost)
items.append({ "type": 'host-component', "level": 'WARN', "message": message, "component-name": 'METRON_REST', "host": metronRESTHost })
if metronParsersHost != metronEnrichmentMaster:
message = "Metron Enrichment Master must be co-located with Metron Parsers on {0}".format(metronParsersHost)
items.append({ "type": 'host-component', "level": 'ERROR', "message": message, "component-name": 'METRON_ENRICHMENT_MASTER', "host": metronEnrichmentMaster })
if metronParsersHost != metronIndexingHost:
message = "Metron Indexing must be co-located with Metron Parsers on {0}".format(metronParsersHost)
items.append({ "type": 'host-component', "level": 'ERROR', "message": message, "component-name": 'METRON_INDEXING', "host": metronIndexingHost })
if metronParsersHost != metronPcapHost:
message = "Metron PCAP must be co-located with Metron Parsers on {0}".format(metronParsersHost)
items.append({ "type": 'host-component', "level": 'ERROR', "message": message, "component-name": 'METRON_PCAP', "host": metronPcapHost })
if metronParsersHost != metronProfilerHost:
message = "Metron Profiler must be co-located with Metron Parsers on {0}".format(metronParsersHost)
items.append({ "type": 'host-component', "level": 'ERROR', "message": message, "component-name": 'METRON_PROFILER', "host": metronProfilerHost })
# Enrichment Master also needs ZK Client, but this is already guaranteed by being colocated with Parsers Master
if metronParsersHost not in zookeeperClientHosts:
message = "Metron must be co-located with an instance of Zookeeper Client"
items.append({ "type": 'host-component', "level": 'WARN', "message": message, "component-name": 'METRON_PARSERS', "host": metronParsersHost })
# Enrichment Master also needs HDFS clients, but this is already guaranteed by being colocated with Parsers Master
if metronParsersHost not in hdfsClientHosts:
message = "Metron must be co-located with an instance of HDFS Client"
items.append({ "type": 'host-component', "level": 'WARN', "message": message, "component-name": 'METRON_PARSERS', "host": metronParsersHost })
if metronEnrichmentMaster not in hbaseClientHosts:
message = "Metron Enrichment Master must be co-located with an instance of HBase Client"
items.append({ "type": 'host-component', "level": 'WARN', "message": message, "component-name": 'METRON_ENRICHMENT_MASTER', "host": metronEnrichmentMaster })
if metronManagementUIHost != metronAlertsUIHost:
message = "Metron Alerts UI must be co-located with Metron Management UI on {0}".format(metronManagementUIHost)
items.append({ "type": 'host-component', "level": 'ERROR', "message": message, "component-name": 'METRON_ALERTS_UI', "host": metronAlertsUIHost })
return items
def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts):
# validate recommended properties in storm-site
siteName = "storm-site"
method = self.validateSTORMSiteConfigurations
items = self.validateConfigurationsForSite(configurations, recommendedDefaults, services, hosts, siteName, method)
return items
def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts):
is_secured = self.isSecurityEnabled(services)
#Suggest Storm Rest URL
if "storm-site" in services["configurations"]:
stormUIServerHost = self.getComponentHostNames(services, "STORM", "STORM_UI_SERVER")[0]
stormUIServerPort = services["configurations"]["storm-site"]["properties"]["ui.port"]
stormUIProtocol = "http://"
if "ui.https.port" in services["configurations"]["storm-site"]["properties"]:
stormUIServerPort = services["configurations"]["storm-site"]["properties"]["ui.https.port"]
stormUIProtocol = "https://"
stormUIServerURL = stormUIProtocol + stormUIServerHost + ":" + stormUIServerPort
putMetronEnvProperty = self.putProperty(configurations, "metron-env", services)
putMetronEnvProperty("storm_rest_addr",stormUIServerURL)
storm_site = services["configurations"]["storm-site"]["properties"]
putStormSiteProperty = self.putProperty(configurations, "storm-site", services)
for property, desired_value in self.getSTORMSiteDesiredValues(is_secured).iteritems():
if property not in storm_site:
putStormSiteProperty(property, desired_value)
elif property == "topology.classpath" and storm_site[property] != desired_value:
topologyClasspath = storm_site[property]
#check that desired values exist in topology.classpath. append them if they do not
for path in desired_value.split(':'):
if path not in topologyClasspath:
topologyClasspath += ":" + path
putStormSiteProperty(property,topologyClasspath)
#Suggest Zeppelin Server URL
if "zeppelin-config" in services["configurations"]:
zeppelinServerHost = self.getComponentHostNames(services, "ZEPPELIN", "ZEPPELIN_MASTER")[0]
zeppelinServerPort = services["configurations"]["zeppelin-config"]["properties"]["zeppelin.server.port"]
zeppelinServerUrl = zeppelinServerHost + ":" + zeppelinServerPort
putMetronEnvProperty = self.putProperty(configurations, "metron-env", services)
putMetronEnvProperty("zeppelin_server_url", zeppelinServerUrl)
#Suggest Zookeeper quorum
if "solr-cloud" in services["configurations"]:
zookeeperHost = self.getComponentHostNames(services, "ZOOKEEPER", "ZOOKEEPER_SERVER")[0]
zookeeperClientPort = services["configurations"]["zoo.cfg"]["properties"]["clientPort"]
solrZkDir = services["configurations"]["solr-cloud"]["properties"]["solr_cloud_zk_directory"]
solrZookeeperUrl = zookeeperHost + ":" + zookeeperClientPort + solrZkDir
putMetronEnvProperty = self.putProperty(configurations, "metron-env", services)
putMetronEnvProperty("solr_zookeeper_url", solrZookeeperUrl)
def validateSTORMSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
# Determine if the cluster is secured
is_secured = self.isSecurityEnabled(services)
storm_site = properties
validationItems = []
for property, desired_value in self.getSTORMSiteDesiredValues(is_secured).iteritems():
if property not in storm_site :
message = "Metron requires this property to be set to the recommended value of " + desired_value
item = self.getErrorItem(message) if property == "topology.classpath" else self.getWarnItem(message)
validationItems.append({"config-name": property, "item": item})
elif storm_site[property] != desired_value:
topologyClasspath = storm_site[property]
for path in desired_value.split(':'):
if path not in topologyClasspath:
message = "Metron requires this property to contain " + desired_value
item = self.getErrorItem(message)
validationItems.append({"config-name": property, "item": item})
return self.toConfigurationValidationProblems(validationItems, "storm-site")
def getSTORMSiteDesiredValues(self, is_secured):
storm_site_desired_values = {
"topology.classpath" : "/etc/hbase/conf:/etc/hadoop/conf"
}
return storm_site_desired_values