背景:计算实时保费需要用到保单表和明细表以及4长维表,spark struct stream 调优之前计算所需10分钟。这严重超过了预期的限制因此需要进行相关的调优,将调优的过程记录如下: 调优前的脚本如下:
BASE_PATH=/data01/spark-app-deploy/mobile-premium-streaming
APP_PATH=${BASE_PATH}/mobile-premium-streaming
for i in ./lib/*.jar
do
CLASS_PATH=$i,${CLASS_PATH}
done
length=`echo ${CLASS_PATH}|wc -L`
CLASS_PATH=`echo ${CLASS_PATH:0:$length-1}`
WRITE_TOPICNAME=MOBILE_PREMIUM_JOIN_9
spark-submit --master yarn \
--deploy-mode cluster \
--conf spark.yarn.maxAppAttempts=4 \
#--conf spark.sql.streaming.stateStore.providerClass=com.paic.RocksDbStateStoreProvider \
#--conf spark.sql.streaming.stateStore.minDeltasForSnapshot=2 \
#--conf spark.sql.streaming.minBatchesToRetain=3 \
--conf spark.app.name=finance-premium-calculator \
--conf spark.streaming.stopGracefullyOnShutdown=true \
--conf spark.sql.streaming.stateStore.minDeltasForSnapshot=3 \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
--conf spark.yarn.max.executor.failures=32 \
--conf spark.yarn.executor.failuresValidityInterval=1h \
--conf spark.task.maxFailures=80 \
#--conf spark.memory.offHeap.enabled=true \
#--conf spark.memory.offHeap.size=8589934592 \
#--conf spark.core.connection.ack.wait.timeout=60 \
#--conf spark.default.parallelism=1000 \
--conf spark.memory.fraction=0.8 \
--conf spark.memory.storageFraction=0.1 \
--conf spark.sql.shuffle.partitions=6 \
--conf spark.hadoop.fs.hdfs.impl.disable.cache=true \
--conf spark.cleaner.periodicGC.interval=5min \
--conf spark.cleaner.referenceTracking.blocking.shuffle=true \
--conf spark.cleaner.referenceTracking.cleanCheckpoints=true \
#--conf spark.speculation=true \
#--conf spark.shuffle.service.enabled=true \
#--conf spark.shuffle.service.port=7337 \
#--conf "spark.driver.extraJavaOptions=-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:ParallelGCThreads=2 -XX:+CMSParallelRemarkEnabled -XX:+ParallelRefProcEnabled -XX:+CMSClassUnloadingEnabled -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+PrintHeapAtGC -XX:+HeapDumpOnOutOfMemoryError -verbose:gc -XX:MaxTenuringThreshold=15 " \
--conf "spark.executor.extraJavaOptions=-Xms12G -XX:+UseParallelGC -XX:+UseParallelOldGC -XX:ParallelGCThreads=6 -XX:NewRatio=5 -XX:SurvivorRatio=1 -XX:MetaspaceSize=256M -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:StartFlightRecording=filename=executor.jfr,duration=900s,settings=profile" \
--queue default \
--driver-class-path ${BASE_PATH}/lib/mysql-connector-java-5.1.8.jar \
--executor-memory=12g \
--driver-memory 4g \
--driver-cores 4 \
--num-executors=6 \
--total-executor-cores 6 \
--jars ${CLASS_PATH} \
--class com.paic.FinacePremium \
${BASE_PATH}/realtime-premium-streaming-1.0-SNAPSHOT.jar ${WRITE_TOPICNAME}
1 在调优之前我仔细观察了spark-sql计算的相关参数,发现每次两路流关联的时候读取的数据量比较大每条流读取的数据量在1万条并且shuffle的时间比较长,因此在仔细研究了spark-kafka的相关参数后加上如下参数:
.option("kafkaConsumer.pollTimeoutMs","500")
.option("failOnDataLoss","false")
.option("maxOffsetsPerTrigger",maxOffsetsPerTrigger)
减小每次读的数据量,进行少量多批而不是一批读完延时很大,数据计算的时间从10分钟减少到2-3分钟的时间。继续调优业务希望5s 2 我们发现很多时候shuffle read会比较慢 毕竟流join的代价有点大(这个后面再聊)。而且会经常出现executor-failed等状况。原以为调节
--conf spark.core.connection.ack.wait.timeout=60
可以减少,但事实是等待的时间反而变长不利于快速计算于是从300->60,同时开启
--conf spark.speculation=true
会做task的推断如果执行出现问题会在别的executor帮忙完成该任务,避免长时间gc,以及重试带来的后果。 同时开启shuffle service服务 添加
--conf spark.shuffle.service.enabled=true \
--conf spark.shuffle.service.port=7337 \
shuffle service 是为了避免shuffle 过程中gc而对任务的执行产生影响。启用单独的进程进行处理。 3 在进行长时间运行之后有一天driver居然hang 住了。于是配置driver的core和gc
--conf "spark.driver.extraJavaOptions=-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:ParallelGCThreads=2 -XX:+CMSParallelRemarkEnabled -XX:+ParallelRefProcEnabled -XX:+CMSClassUnloadingEnabled -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+PrintHeapAtGC -XX:+HeapDumpOnOutOfMemoryError -verbose:gc -XX:MaxTenuringThreshold=15 "
4 接下来就是进行gc调优了,而且存在每隔30分钟就有个任务执行特别长,在查看了日志后我们发现有executor长时间gc。 一开始gc什么参数都没有: 我们通过jfr
存在24次的gc那么自然而然的我想要将old提升:
-Xms12G -XX:+UseParallelGC -XX:+UseParallelOldGC -XX:ParallelGCThreads=6 -XX:NewRatio=3
而且可以看到堆的使用是长时间高位的因此增加内存到12G,同时增加parallelGcthread
调整完成后gc的表现状况如下:
除第一次gc时间较长之外gc的次数明显减少。而且计算的时间缩减到均值20s附近。
同时查看堆中很多都是statestore占用的堆。很明显=多路流关联做了很多的缓存和snapshot,有时间再继续调优这块以及将gc的次数再减少。
5 statestore 占用的内存多,很明显是由于窗口中存留的数据造成的,想要减小statestore堆的大小需要定时清理GroupState中过期没有更新的数据。https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/GroupState.html 根据业务的描述以及架构的设计,我们可以将过期失效的实践定为24h,定期清理过期数据,防止state 不断增大
本文由 妖言君 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Feb 21, 2021 at 02:42 pm