blob: 3debab767ea1171b09302a3c79ac83488565af9c [file] [log] [blame]
#!/usr/bin/env bash
test -f /etc/profile && . /etc/profile
test -f $HOME/.bash_profile && . $HOME/.bash_profile
base=$(cd $(dirname $0);pwd)
function __check__(){
local cmd=${1:-"gawk"}
if ! which $cmd >/dev/null 2>&1;then
echo "Please install $cmd" >&2
exit
fi
}
function create_notes(){
local msg=$1
local col=${2:-55}
__check__
gawk -vc=$col 'function as(ll,is,ls){for(i=1;i<ll;i++){ls=ls""is};return ls};
BEGIN{print as(c,"#")}'
echo -e "$msg" | awk -vc=$col 'function as(ll,is,ls){for(i=1;i<ll;i++){ls=ls""is};return ls};
{m=$0;l=(c-length(m)-2)/2;s=as(l-1," ");s1=as(c-length("#"s""m)-1," ");print "#"s""m""s1"#"}'
gawk -vc=$col -vr=$row 'function as(ll,is,ls){for(i=1;i<ll;i++){ls=ls""is};return ls};
BEGIN{print as(c,"#")}'
}
function _n(){
echo -e "$@" | awk '{print "#"$0}'
}
function _p(){
local ph notes k v
case $# in
2)
ph=${Lprefix:-"spark"};notes="#\n";k=$1;v=$2;;
3)
ph=${Lprefix:-"spark"};notes="$1";k=$2;v=$3;;
4)
ph=$1;notes="$2";k=$3;v=$4;;
1)
_n "$@";echo;return;;
*)
echo;return;;
esac
echo -e "$(_n $notes)\n${ph}.${k}=${v}\n"
}
#提交脚本run.sh需要的几个配置
function user_run_params(){
local Lprefix="spark.run"
_p "必须设置,执行class的全包名称" "main" " "
_p "必须设置,包含main class的jar包名称\njar文件必须包含在lib.path当中" "main.jar" " "
_p "提供给执行class的命令行参数,多个参数之间用逗号隔开,参数中不能包含空格等空白符\nEx:param1,param2,.." \
"self.params" " "
_p "用户代码依赖jar包的所在目录\n可以是绝对路径,也可以是相对此配置文件的相对路径,相对路径会自动补全" "lib.path" " "
}
#spark任务提交需要的几个基础配置
function spark_run_params(){
_p "执行集群设置,不用设置,一般使用YARN" "master" "yarn"
_p "YARN部署模式\ndefault=cluster" "submit.deployMode" "cluster"
_p "spark-streaming每个批次间隔时间\ndefault=300" "batch.duration" "300"
_p "spark on yarn的任务提交队列\ndefault=defalut" "yarn.queue" "default"
_p "spark 任务名称配置,建议保持任务名称全局唯一\n这样可以在设计任务失败的时候根据名称做一些唯一处理\n不设置使用类全名.App" \
"app.name" ""
_p "spark网络序列化方式,默认是JavaSerializer,可针对所有类型但速度较慢\n这里使用推荐的Kryo方式\nkafka-0.10必须使用此方式" \
"serializer" "org.apache.spark.serializer.KryoSerializer"
_p "++++++++++++++++++++++Driver节点相关配置+++++++++++++++++++++++++++"
local Lprefix="spark.driver"
_p "Driver节点使用内存大小设置\ndefault=512MB" "memory" "512MB"
_p "Driver节点使用的cpu个数设置\ndefault=1" "cores" "1"
_p "Driver节点构建时spark-jar和user-jar冲突时优先使用用户提供的,这是一个实验性质的参数只对cluster模式有效\ndefault=false" \
"userClassPathFirst" "false"
_p "++++++++++++++++++++++Executor节点相关配置+++++++++++++++++++++++++"
Lprefix="spark.executor"
_p "Executor个数设置\ndefault=1" "instances" "1"
_p "Executor使用cpu个数设置\ndefault=1" "cores" "1"
_p "Executor使用内存大小设置\ndefault=512MB" "memory" "512MB"
_p "同driver节点配置作用相同,但是是针对executor的\ndefault=false" "userClassPathFirst" "false"
}
#spark 任务动态资源分配的配置
function spark_dynamic_params(){
_p "++++++++++++++++++++++++Executor动态分配相关配置++++++++++++++++++++"
local Lprefix="spark.shuffle.service"
_p "Executor动态分配的前置服务\ndefault=false" "enabled" "true"
_p "服务对应的端口,此端口服务是配置在yarn-site中的,由NodeManager服务加载启动\ndefault=7337" "port" "7337"
Lprefix="spark.dynamicAllocation"
_p "配置是否启用资源动态分配,此动态分配是针对executor的,需要yarn集群配置支持动态分配\ndefault=false" \
"enabled" "true"
_p "释放空闲的executor的时间\ndefault=60s" "executorIdleTimeout" "60s"
_p "有缓存的executor空闲释放时间\ndefault=infinity(默认不释放)" "cachedExecutorIdleTimeout" "-1"
_p "初始化executor的个数,如果设置spark.executor.instances谁小用谁\ndefault=minExecutors(不设置使用此项配置值)" \
"initialExecutors" "1"
_p "executor动态分配可分配最大数量\ndefault=infinity" "maxExecutors" "60"
_p "executor动态收缩的最小数量\ndefault=0" "minExecutors" "1"
_p "批次调度延迟多长时间开始增加executor\ndefault=1s" "schedulerBacklogTimeout" "1s"
_p "同上,但是是针对之后的请求\ndefault=SchedulerBacklogTimeout(不设置使用此项配置值)" \
"sustainedSchedulerBacklogTimeout" "1s"
}
#消费kafka需要的基础配置
function streamx_spark_source_kafka(){
create_notes "\nStreamX-Spark Kafka Source\nbase config\n"
_p "spark.source.kafka.consume后面的配置是标准kafka配置"
local Lprefix="spark.source.kafka.consume"
_p "kafka消费的topics配置,可以配置多个,每个topic之间用逗号[,]隔开\ndefault=" "topics" ""
_p "kafka consumer的group id.\ndefault=kafka.consumer.001" "group.id" "kafka.consumer.001"
_p "kafka集群的主机和端口号,可以配置多个,每个主机之间用逗号[,]隔开\ndefault=" "bootstrap.servers" ""
_p "第一次消费kafka topic的时候指定从什么位置消费
有两个可选值latest[最新位置],earliest[最早位置]\ndefault=earliest" "auto.offset.reset" "earliest"
_p "spark.source.kafka" "spark消费kafka的时候如何管理offset
这里可选的值有三种hbase,redis,kafka每种值对应一种存储方式\ndefault=kafka" "offset.store.type" "kafka"
_p "自定义spark管理kafka offset的方法,需要指定一个自定义类的名称\nspark.source.kafka.offset.store.class=none"
_p "新版本kafka使用的key序列化方式\ndefault=java.Serialization" \
"key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
_p "最新版kafka使用的value序列化方式\ndefault=java.Serialization" \
"value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
_p "获取一次数据的最大长度,此值的大小需要kafka server端支持\ndefault=10485760" "max.partition.fetch.bytes" "10485760"
_p "获取一次数据请求的最大等待时间\ndefault=3000" "fetch.max.wait.ms" "3000"
}
function streamx_spark_sink_redis(){
create_notes "\nStreamX-Spark Redis Sink\nbase config\n"
_p "StreamX-Spark redis sink需要的几个配置"
local Lprefix="spark.sink.redis"
_p "redis主机" "host" ""
_p "redis端口" "port" "6379"
_p "redis数据库" "db" "0"
_p "redis连接超时时间" "timeout" "30"
}
function streamx_spark_sink_influx(){
create_notes "\nStreamX-Spark InfluxDB Sink\nbase config\n"
_p "StreamX-Spark influxDB sink需要的几个配置"
local Lprefix="spark.sink.influxDB"
_p "influxDB主机" "host" ""
_p "influxDB端口" "port" "8086"
_p "influxDB数据库" "db" ""
}
function streamx_spark_congestion_monitor(){
create_notes "\nStreamX-Spark Monitor\nCongestion base config\n"
_p "StreamX-Spark 自带的拥堵监控需要的几个参数"
local Lprefix="spark.monitor.congestion"
_p "钉钉机器人发送消息的api地址,需要从http开头的全路径" "send.api" ""
_p "堆积了几个批次之后开始告警,默认是0不告警\ndefault=0" "batch" "0"
_p "钉钉联系人注册钉钉使用的手机号" "ding.to" ""
Lprefix="spark.monitor.suicide"
_p "堆积多少个批次之后kill掉任务,默认是0不kill,配合任务自动重启功能可有效重启堆积任务使恢复\ndefault=0" "batch" "0"
}
function create_default(){
create_notes "\nspark process startup.sh\nuser config\n"
user_run_params
create_notes "\nspark self config\n"
spark_run_params
spark_dynamic_params
#create_notes "\nspark process\nSource config\n"
#_p;_p;_p;
#create_notes "\nspark process\nSink config\n"
#_p;_p;_p;
}
create_default
streamx_spark_congestion_monitor
streamx_spark_source_kafka
streamx_spark_sink_redis
#streamx_spark_sink_influx