blob: 7a339a60d8a21280976ce49c907578b04e951388 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.eagle.topology;
import com.typesafe.config.Config;
import org.apache.eagle.topology.resolver.TopologyRackResolver;
import org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class TopologyCheckAppConfig implements Serializable {
public static final String TOPOLOGY_DATA_FETCH_SPOUT_NAME = "topologyDataFetcherSpout";
public static final String TOPOLOGY_ENTITY_PERSIST_BOLT_NAME = "topologyEntityPersistBolt";
public static final String PARSE_BOLT_NAME = "parserBolt";
public static final String SINK_BOLT_NAME = "sinkBolt";
private static final int MAX_NUM_THREADS = 10;
private static final String HBASE_ZOOKEEPER_CLIENT_PORT = "2181";
private static final Logger LOG = LoggerFactory.getLogger(TopologyCheckAppConfig.class);
public DataExtractorConfig dataExtractorConfig;
public HBaseConfig hBaseConfig;
public HdfsConfig hdfsConfig;
public MRConfig mrConfig;
public List<TopologyConstants.TopologyType> topologyTypes;
private Config config;
private TopologyCheckAppConfig(Config config) {
hBaseConfig = null;
hdfsConfig = null;
mrConfig = null;
dataExtractorConfig = new DataExtractorConfig();
topologyTypes = new ArrayList<>();
init(config);
}
public Config getConfig() {
return config;
}
public static TopologyCheckAppConfig newInstance(Config config) {
return new TopologyCheckAppConfig(config);
}
private void init(Config config) {
this.config = config;
this.dataExtractorConfig.site = config.getString("siteId");
this.dataExtractorConfig.fetchDataIntervalInSecs = config.getLong("topology.fetchDataIntervalInSecs");
this.dataExtractorConfig.parseThreadPoolSize = MAX_NUM_THREADS;
if (config.hasPath("topology.parseThreadPoolSize")) {
this.dataExtractorConfig.parseThreadPoolSize = config.getInt("topology.parseThreadPoolSize");
}
this.dataExtractorConfig.numDataFetcherSpout = config.getInt("topology.numDataFetcherSpout");
this.dataExtractorConfig.numEntityPersistBolt = config.getInt("topology.numEntityPersistBolt");
this.dataExtractorConfig.numKafkaSinkBolt = config.getInt("topology.numOfKafkaSinkBolt");
this.dataExtractorConfig.resolverCls = DefaultTopologyRackResolver.class;
if (config.hasPath("topology.rackResolverCls")) {
String resolveCls = config.getString("topology.rackResolverCls");
try {
this.dataExtractorConfig.resolverCls = (Class<? extends TopologyRackResolver>) Class.forName(resolveCls);
} catch (ClassNotFoundException e) {
LOG.warn("{} is not found, will use DefaultTopologyRackResolver instead", resolveCls);
}
}
if (config.hasPath("dataSourceConfig.hbase.enabled") && config.getBoolean("dataSourceConfig.hbase.enabled")) {
topologyTypes.add(TopologyConstants.TopologyType.HBASE);
hBaseConfig = new HBaseConfig();
hBaseConfig.zkQuorum = config.getString("dataSourceConfig.hbase.zkQuorum");
hBaseConfig.zkRoot = config.getString("dataSourceConfig.hbase.zkZnodeParent");
hBaseConfig.zkClientPort = getOptionalConfig("dataSourceConfig.hbase.zkPropertyClientPort", HBASE_ZOOKEEPER_CLIENT_PORT);
hBaseConfig.zkRetryTimes = getOptionalConfig("dataSourceConfig.hbase.zkRetryTimes", "5");
hBaseConfig.eagleKeytab = getOptionalConfig("dataSourceConfig.hbase.kerberos.eagle.keytab", null);
hBaseConfig.eaglePrincipal = getOptionalConfig("dataSourceConfig.hbase.kerberos.eagle.principal", null);
hBaseConfig.hbaseMasterPrincipal = getOptionalConfig("dataSourceConfig.hbase.kerberos.master.principal", null);
}
if (config.hasPath("dataSourceConfig.mr.enabled") && config.getBoolean("dataSourceConfig.mr.enabled")) {
topologyTypes.add(TopologyConstants.TopologyType.MR);
mrConfig = new MRConfig();
mrConfig.rmUrls = config.getString("dataSourceConfig.mr.rmUrl").split(",\\s*");
mrConfig.historyServerUrl = getOptionalConfig("dataSourceConfig.mr.historyServerUrl", null);
}
if (config.hasPath("dataSourceConfig.hdfs.enabled") && config.getBoolean("dataSourceConfig.hdfs.enabled")) {
topologyTypes.add(TopologyConstants.TopologyType.HDFS);
hdfsConfig = new HdfsConfig();
hdfsConfig.namenodeUrls = config.getString("dataSourceConfig.hdfs.namenodeUrl").split(",\\s*");
}
}
public static class DataExtractorConfig implements Serializable {
public String site;
public int numDataFetcherSpout;
public int numEntityPersistBolt;
public int numKafkaSinkBolt;
public long fetchDataIntervalInSecs;
public int parseThreadPoolSize;
public Class<? extends TopologyRackResolver> resolverCls;
}
public static class HBaseConfig implements Serializable {
public String zkQuorum;
public String zkRoot;
public String zkClientPort;
public String zkRetryTimes;
public String hbaseMasterPrincipal;
public String eaglePrincipal;
public String eagleKeytab;
}
public static class MRConfig implements Serializable {
public String[] rmUrls;
public String historyServerUrl;
}
public static class HdfsConfig implements Serializable {
public String[] namenodeUrls;
}
private String getOptionalConfig(String key, String defaultValue) {
if (this.config.hasPath(key)) {
return config.getString(key);
} else {
return defaultValue;
}
}
}