spark在大数据分析中的填坑姿势
Spark和MapReduce是大数据处理中应用广泛的两套计算框架。
之前我们的数据团队在批处理上,用的是MapReduce框架。在实时统计上,用的是Storm框架。同时我们也面临着一些问题。MapReduce和Storm是两套不同的框架,这带来了一定的学习成本。MapReduce二阶段的编程方式和Storm拓扑的编程方式,使得业务逻辑散落在不同的代码片段中,代码可读性比较差,提高了代码开发和维护的成本。另外,MapReduce在批处理中,也常会遇到因数据量过大而带来的性能问题。
我们希望能够找到一个能够同时支持批处理和实时,并且在批处理的性能上并不逊于MapReduce的技术框架。为此,我们引入了Spark和Spark-streaming。
Spark有以下几点优势。
- Spark支持批处理,Spark-streaming支持流式计算,这让我们可以在批处理和实时处理上统一技术框架,降低技术学习和平台维护的成本;
- 基于Spark的程序开发,提供了丰富的算子和链式编程,提高了编码效率和代码的可读性;
- Spark基于内存的计算方式,性能要显著高于MapReduce基于磁盘的二阶段范式,尤其是在机器学习的场景中,需要对一份数据不断迭代运算时,Spark的性能优势更加显著。
不过,一切,真的都如此美好么?
对Spark的误解
我们先来谈下对Spark理解上的一个坑。
“Spark基于内存的计算方式,性能要显著高于MapReduce基于磁盘的二阶段范式”。
这句话,是诱人的,却也是坑人的。粗略一看,内存比磁盘快很多,误以为Spark全速领跑。
不过任何一个工具的表现,都离不开使用场景。
对于ETL任务,Spark采用的其实也是MapReduce的二阶段范式。
MapReduce二阶段范式最耗时的地方在哪里呢?在shuffle。shuffle有两个操作很耗时,一个是数据在网络上的传输,另一个是中间数据的落盘。实际场景中,数据量一般较大,数据存在集群不同的机器上,网络传输往往无法避免。而内存又比磁盘成本高很多,一般很难分配足够存下所有中间数据的内存,此时,中间数据的落盘也无法避免。
尽管Spark在shuffle阶段也做过一些优化,但网络传输和落盘这两步,终归无法避免。所以说,“显著”的提升,是谈不上的。
讲到这里,其实Spark官方也是鸡贼,首页贴出一个逻辑回归的性能比较图,再加上一个大大的100X faster。拉萌新上车,妥妥的。
在Spark填的那些坑
既然上车了,下面就分享一下在Spark实践中常遇到的坑(主要是针对Spark在ETL任务中的实践)。
坑1-数据倾斜
数据倾斜是MR二阶段范式中最普遍的问题,数据倾斜和shuffle紧密相关。
下图是《Hadoop权威指南》一书中,对MapReduce框架shuffle过程的说明。虽然MapReduce的shuffle和Spark的shuffle在细节实现上有所不同,但却不妨碍我们用这张图来理解shuffle的大致过程。
shuffle的字面意思是混洗,其最终目的是将同一个key下的数据聚合到一起处理。
如上图所示,原始数据被分成很多份,一个map task处理一份。map task从输入中提取出需要的数据,将数据处理成KV的形式。key通过hash的方式被分配到不同的reduce上(实际上是reduce task从map端主动拉取对应key的数据)。最终,同一个key的数据会汇总到同一个reduce中,进行下一步处理。
在shuffle的过程中,当一些key对应的数据过多时,会使对应的reduce task处理的数据量过大,导致数据倾斜。这些负载过高的reduce task的运行时间明显变长,进而拖慢整个任务的执行速度,甚至可能发生内存溢出,导致任务失败。
数据倾斜的解决思路
- 如果资源允许,可以直接增大reduce的并行度或者每个reduce的内存;
- 可以从Spark的算子上进行优化,比如使用reduceByKey取代groupByKey;
- 从业务逻辑着手,提前剔除不需要参与shuffle的异常数据;
- 对数据进行预处理。比如先处理小时数据,再根据小时维度的数据进一步聚合成天数据;
- 结合实际需求,选择合适的算法。如在不需要精确统计UV的场景下,可以选择hyperloglog来实现。
坑2-内存溢出
OOM绝对是坑王,因为它是致命的。OOM和数据倾斜常常粘在一起,这在上一节已有提及。
我们先来了解一下Spark的内存模型。Spark会启动两种JVM进程:driver和executor,本文主要介绍executor的内存模型。JVM的内存可分为堆外内存和堆内内存。Spark又将堆内内存分为存储内存、执行内存和用户内存,而堆外内存则只包含存储内存和执行内存。下图是Spark堆内内存模型。
下面我们分别来介绍各块内存溢出的情况。
- 执行内存的溢出
执行内存用来缓存shuffle中间数据。执行内存的溢出一般是由数据倾斜引起的,数据倾斜使得执行内存需要缓存的数量过大,导致OOM。虽然Spark在预估到执行内存不足的时候,会考虑占用存储内存,或是溢写到磁盘,但是,Spark对执行内存的预估是采样进行的,无法保证预估的准确性和及时性。当缓存的数据超过上次提前申请的内存,Spark又还没来得及进行下一次预估时,就会发生OOM。
执行内存溢出的解决思路和数据倾斜类似,此处不再赘述。 - 存储内存的溢出
存储内存用来缓存RDD或是broadcast数据。当一个RDD被重复使用的时候,我们可能会选择将RDD缓存下来,否则每次使用到这个RDD时,都可能重新计算一遍。当我们在内存中缓存过大的RDD时,就有可能发生OOM。
此时,我们可以调整RDD的缓存等级,由内存缓存调整为内存+磁盘的缓存方式。这样内存不足时,就会溢写到磁盘了。代码中用来缓存RDD的接口是cache()和persist()。通过阅读源码,可以看到cache实际上就是调用的persist(),不过存储级别被默认设置为了MEMORY_ONLY。当需要缓存的RDD过大时,我们可以使用persist来缓存RDD,并根据需求选择恰当的StorageLeval,如MEMORY_AND_DISK。 用户内存的溢出
用户内存用来缓存用户定义的数据。当我们内存使用不当的时候,可能导致用户内存的溢出。
这里举两个内存使用不当的例子及对应的解决思路。- 新建了过多的大对象。有一些数据是共用的(如配置表数据),却在map/reduce端处理每条记录时,都重新加载,并新建一个对象缓存。此时,可以考虑采取broadcast的方式,将共用的数据广播到每个task上。对于无法广播的数据,可以考虑使用mapPartitions或是foreachPartition,针对每个分区(而不是记录),来新建一个对象缓存。
- 排序引入的大链表。有些业务逻辑复杂,需要在reduce中,对key对应的所有value进行排序。一般直接的思路是用List将value缓存下来,再排序,但是value数据量较大时,List就会占用大量内存。此时,可以从算子上进行优化,考虑使用repartitionAndSortWithinPartitions,借助Spark框架本身的排序,避免用户代码中的排序。
- 堆外内存的溢出
Spark默认不启用堆外内存,可使用spark.memory.offHeap.enabled配置打开。堆外内存可以避免GC的扫描和回收,提升了性能,但同时也需要Spark自行分配和释放,加大了实现的复杂度。
堆外内存也可能发生溢出,比如其中的Direct Memory。Spark的shuffle在使用netty框架进行网络传输时,需要使用Direct Memory缓存数据。当存在数据倾斜,导致reduce从map拉取的数据量过大,申请的堆外内存过多,就会导致Direct Memory溢出。
解决思路:可配置spark.memory.offHeap.size申请更多的堆外内存。针对数据倾斜,也可参考数据倾斜一节提供的思路。
坑3-并发问题
这里的并发,指的是executor内部的并发。
在对Spark任务分配资源时,一般有三个配置num-executors,executor-cores和executor-memory。其中,num-executors决定了有多少个executor,executor-cores决定了一个executor中有多少个task并发执行,这些并发执行的task共享executor-memory配置的内存。并发可能会导致死锁。
这里分享一个我们遇到的死锁问题。当时任务在并行读取avro文件时,挂在了HashMap.getEntry()这个方法上。当时集群环境中的avro版本是1.7.6,这个版本的avro在解析schema的时候,使用了一个全局的hashmap,而hashmap不是线程安全的,在多线程的场景下可能会导致死循环(可参见AVRO-1781)。
解决这类问题有两个思路:
- 规避导致死锁的代码,比如避免使用全局性的变量,替换或升级有问题的第三方jar包;
- 将executor-cores配置为1,避免并发执行。
我们目前采取的是第二种。因为集群是很多业务共用的,包的升级更新不是一时半会儿的事,所以我们通过将executor-cores配置成1,暂时解决了这个问题。
坑4-数据类型使用不当
Spark程序一般基于scala编写,对于scala的一些数据类型的误用,可能会导致性能问题。比如说List和ListBuffer,当需要将大量数据逐个加入链表时,需要使用ListBuffer,而非List。因为List底层使用的其实还是ListBuffer,List在添加元素的时候,每次都需要新建一个ListBuffer。这个新建操作在数据量大的时候,耗时极为明显。
Spark的填坑方法
在Spark大数据分析中踩到坑时,应当如何定位分析问题呢?
- 借助Spark的监测页面
Spark的监测页面已经为任务的运行状态提供了大量的信息。
在Jobs页面可以看到所有的job,进入具体的job,还可以看到job执行的DAG图,以及各个环节的耗时情况。据此我们可以了解到程序被划分成了多少个作业,每个作业的stage是如何划分的,stage内部的task又是如何划分的。这为我们的任务优化现提供了很好的指导。
在Stages页面,进入已完成或是正在运行的stage,可以看到metrics信息,比如输入数据的大小、shuffle数据的大小、GC时间等,通过这些数据,我们可以判断资源分配的是否合理,是否存在数据倾斜等。
在Executors页面,可以看到driver和所有executor的日志,如果任务是在执行中,还可以观察任务执行的堆栈信息。这对于分析任务耗时过长,或是任务挂起,有很大的作用。
Spark监测页面提供的信息已经非常丰富,大部分问题都可以据此解决了。
- 了解Spark的原理和实现
了解Spark作业DAG图的生成原理,我们就能更好的理解Spark监测页面上任务的执行流程。了解Spark的内存模型,我们就能有针对性的解决OOM问题。了解shuffle的过程,我们就能明白数据倾斜的本质,并采用合适的优化措施。想要更好的填坑,就要先了解jio下的这片土地啊。 - 借助jvm工具
Spark上的任务都是启动的JVM。在遇到棘手的问题,根据Spark监测页面也无法获取有效信息时,可以考虑借助jvm工具,如性能分析工具JProfiler等。
Spark在业务中的应用情况
现在Spark在我们的业务中已经起到了重要的作用。
我们的流式计算已经完全迁移到了Spark-streaming上,目前Spark-streaming集群上一共运行了15个流式任务,使用了300核和510G内存。
我们的ETL、机器学习等任务也基本上都在使用Spark。下图中分别是近一周,这类任务的数量趋势和vcore-seconds的使用趋势。
本文叙述了下对Spark概念的一些理解,讨论了下Spark实践中可能踩到的坑,并简单分享了下Spark填坑的一些方法。欢迎大家批评指正。
参考链接