spark在大数据分析中的填坑姿势

spark在大数据分析中的填坑姿势

Spark和MapReduce是大数据处理中应用广泛的两套计算框架。

之前我们的数据团队在批处理上,用的是MapReduce框架。在实时统计上,用的是Storm框架。同时我们也面临着一些问题。MapReduce和Storm是两套不同的框架,这带来了一定的学习成本。MapReduce二阶段的编程方式和Storm拓扑的编程方式,使得业务逻辑散落在不同的代码片段中,代码可读性比较差,提高了代码开发和维护的成本。另外,MapReduce在批处理中,也常会遇到因数据量过大而带来的性能问题。

我们希望能够找到一个能够同时支持批处理和实时,并且在批处理的性能上并不逊于MapReduce的技术框架。为此,我们引入了Spark和Spark-streaming。

spark图标2.png

Spark有以下几点优势。

  1. Spark支持批处理,Spark-streaming支持流式计算,这让我们可以在批处理和实时处理上统一技术框架,降低技术学习和平台维护的成本;
  2. 基于Spark的程序开发,提供了丰富的算子和链式编程,提高了编码效率和代码的可读性;
  3. Spark基于内存的计算方式,性能要显著高于MapReduce基于磁盘的二阶段范式,尤其是在机器学习的场景中,需要对一份数据不断迭代运算时,Spark的性能优势更加显著。

不过,一切,真的都如此美好么?

对Spark的误解

我们先来谈下对Spark理解上的一个坑。

“Spark基于内存的计算方式,性能要显著高于MapReduce基于磁盘的二阶段范式”。
这句话,是诱人的,却也是坑人的。粗略一看,内存比磁盘快很多,误以为Spark全速领跑。
不过任何一个工具的表现,都离不开使用场景。

对于ETL任务,Spark采用的其实也是MapReduce的二阶段范式。

MapReduce二阶段范式最耗时的地方在哪里呢?在shuffle。shuffle有两个操作很耗时,一个是数据在网络上的传输,另一个是中间数据的落盘。实际场景中,数据量一般较大,数据存在集群不同的机器上,网络传输往往无法避免。而内存又比磁盘成本高很多,一般很难分配足够存下所有中间数据的内存,此时,中间数据的落盘也无法避免。

尽管Spark在shuffle阶段也做过一些优化,但网络传输和落盘这两步,终归无法避免。所以说,“显著”的提升,是谈不上的。

讲到这里,其实Spark官方也是鸡贼,首页贴出一个逻辑回归的性能比较图,再加上一个大大的100X faster。拉萌新上车,妥妥的。
spark100Xfaster.png

在Spark填的那些坑

既然上车了,下面就分享一下在Spark实践中常遇到的坑(主要是针对Spark在ETL任务中的实践)。

坑1-数据倾斜

数据倾斜是MR二阶段范式中最普遍的问题,数据倾斜和shuffle紧密相关。
下图是《Hadoop权威指南》一书中,对MapReduce框架shuffle过程的说明。虽然MapReduce的shuffle和Spark的shuffle在细节实现上有所不同,但却不妨碍我们用这张图来理解shuffle的大致过程。

MRShuffle图.png

shuffle的字面意思是混洗,其最终目的是将同一个key下的数据聚合到一起处理。
如上图所示,原始数据被分成很多份,一个map task处理一份。map task从输入中提取出需要的数据,将数据处理成KV的形式。key通过hash的方式被分配到不同的reduce上(实际上是reduce task从map端主动拉取对应key的数据)。最终,同一个key的数据会汇总到同一个reduce中,进行下一步处理。

在shuffle的过程中,当一些key对应的数据过多时,会使对应的reduce task处理的数据量过大,导致数据倾斜。这些负载过高的reduce task的运行时间明显变长,进而拖慢整个任务的执行速度,甚至可能发生内存溢出,导致任务失败。

数据倾斜的解决思路

  1. 如果资源允许,可以直接增大reduce的并行度或者每个reduce的内存;
  2. 可以从Spark的算子上进行优化,比如使用reduceByKey取代groupByKey;
  3. 从业务逻辑着手,提前剔除不需要参与shuffle的异常数据;
  4. 对数据进行预处理。比如先处理小时数据,再根据小时维度的数据进一步聚合成天数据;
  5. 结合实际需求,选择合适的算法。如在不需要精确统计UV的场景下,可以选择hyperloglog来实现。

坑2-内存溢出

OOM绝对是坑王,因为它是致命的。OOM和数据倾斜常常粘在一起,这在上一节已有提及。
我们先来了解一下Spark的内存模型。Spark会启动两种JVM进程:driver和executor,本文主要介绍executor的内存模型。JVM的内存可分为堆外内存和堆内内存。Spark又将堆内内存分为存储内存、执行内存和用户内存,而堆外内存则只包含存储内存和执行内存。下图是Spark堆内内存模型

spark内存模型.png

下面我们分别来介绍各块内存溢出的情况。

  1. 执行内存的溢出
    执行内存用来缓存shuffle中间数据。执行内存的溢出一般是由数据倾斜引起的,数据倾斜使得执行内存需要缓存的数量过大,导致OOM。虽然Spark在预估到执行内存不足的时候,会考虑占用存储内存,或是溢写到磁盘,但是,Spark对执行内存的预估是采样进行的,无法保证预估的准确性和及时性。当缓存的数据超过上次提前申请的内存,Spark又还没来得及进行下一次预估时,就会发生OOM。
    执行内存溢出的解决思路和数据倾斜类似,此处不再赘述。
  2. 存储内存的溢出
    存储内存用来缓存RDD或是broadcast数据。当一个RDD被重复使用的时候,我们可能会选择将RDD缓存下来,否则每次使用到这个RDD时,都可能重新计算一遍。当我们在内存中缓存过大的RDD时,就有可能发生OOM。
    此时,我们可以调整RDD的缓存等级,由内存缓存调整为内存+磁盘的缓存方式。这样内存不足时,就会溢写到磁盘了。代码中用来缓存RDD的接口是cache()和persist()。通过阅读源码,可以看到cache实际上就是调用的persist(),不过存储级别被默认设置为了MEMORY_ONLY。当需要缓存的RDD过大时,我们可以使用persist来缓存RDD,并根据需求选择恰当的StorageLeval,如MEMORY_AND_DISK。
  3. 用户内存的溢出
    用户内存用来缓存用户定义的数据。当我们内存使用不当的时候,可能导致用户内存的溢出。
    这里举两个内存使用不当的例子及对应的解决思路。

    1. 新建了过多的大对象。有一些数据是共用的(如配置表数据),却在map/reduce端处理每条记录时,都重新加载,并新建一个对象缓存。此时,可以考虑采取broadcast的方式,将共用的数据广播到每个task上。对于无法广播的数据,可以考虑使用mapPartitions或是foreachPartition,针对每个分区(而不是记录),来新建一个对象缓存。
    2. 排序引入的大链表。有些业务逻辑复杂,需要在reduce中,对key对应的所有value进行排序。一般直接的思路是用List将value缓存下来,再排序,但是value数据量较大时,List就会占用大量内存。此时,可以从算子上进行优化,考虑使用repartitionAndSortWithinPartitions,借助Spark框架本身的排序,避免用户代码中的排序。
  4. 堆外内存的溢出
    Spark默认不启用堆外内存,可使用spark.memory.offHeap.enabled配置打开。堆外内存可以避免GC的扫描和回收,提升了性能,但同时也需要Spark自行分配和释放,加大了实现的复杂度。
    堆外内存也可能发生溢出,比如其中的Direct Memory。Spark的shuffle在使用netty框架进行网络传输时,需要使用Direct Memory缓存数据。当存在数据倾斜,导致reduce从map拉取的数据量过大,申请的堆外内存过多,就会导致Direct Memory溢出。
    解决思路:可配置spark.memory.offHeap.size申请更多的堆外内存。针对数据倾斜,也可参考数据倾斜一节提供的思路。

坑3-并发问题

这里的并发,指的是executor内部的并发。

在对Spark任务分配资源时,一般有三个配置num-executors,executor-cores和executor-memory。其中,num-executors决定了有多少个executor,executor-cores决定了一个executor中有多少个task并发执行,这些并发执行的task共享executor-memory配置的内存。并发可能会导致死锁。

这里分享一个我们遇到的死锁问题。当时任务在并行读取avro文件时,挂在了HashMap.getEntry()这个方法上。当时集群环境中的avro版本是1.7.6,这个版本的avro在解析schema的时候,使用了一个全局的hashmap,而hashmap不是线程安全的,在多线程的场景下可能会导致死循环(可参见AVRO-1781)。

解决这类问题有两个思路:

  1. 规避导致死锁的代码,比如避免使用全局性的变量,替换或升级有问题的第三方jar包;
  2. 将executor-cores配置为1,避免并发执行。

我们目前采取的是第二种。因为集群是很多业务共用的,包的升级更新不是一时半会儿的事,所以我们通过将executor-cores配置成1,暂时解决了这个问题。

坑4-数据类型使用不当

Spark程序一般基于scala编写,对于scala的一些数据类型的误用,可能会导致性能问题。比如说List和ListBuffer,当需要将大量数据逐个加入链表时,需要使用ListBuffer,而非List。因为List底层使用的其实还是ListBuffer,List在添加元素的时候,每次都需要新建一个ListBuffer。这个新建操作在数据量大的时候,耗时极为明显。

Spark的填坑方法

在Spark大数据分析中踩到坑时,应当如何定位分析问题呢?

  1. 借助Spark的监测页面
    Spark的监测页面已经为任务的运行状态提供了大量的信息。

在Jobs页面可以看到所有的job,进入具体的job,还可以看到job执行的DAG图,以及各个环节的耗时情况。据此我们可以了解到程序被划分成了多少个作业,每个作业的stage是如何划分的,stage内部的task又是如何划分的。这为我们的任务优化现提供了很好的指导。
在Stages页面,进入已完成或是正在运行的stage,可以看到metrics信息,比如输入数据的大小、shuffle数据的大小、GC时间等,通过这些数据,我们可以判断资源分配的是否合理,是否存在数据倾斜等。
在Executors页面,可以看到driver和所有executor的日志,如果任务是在执行中,还可以观察任务执行的堆栈信息。这对于分析任务耗时过长,或是任务挂起,有很大的作用。
Spark监测页面提供的信息已经非常丰富,大部分问题都可以据此解决了。

  1. 了解Spark的原理和实现
    了解Spark作业DAG图的生成原理,我们就能更好的理解Spark监测页面上任务的执行流程。了解Spark的内存模型,我们就能有针对性的解决OOM问题。了解shuffle的过程,我们就能明白数据倾斜的本质,并采用合适的优化措施。想要更好的填坑,就要先了解jio下的这片土地啊。
  2. 借助jvm工具
    Spark上的任务都是启动的JVM。在遇到棘手的问题,根据Spark监测页面也无法获取有效信息时,可以考虑借助jvm工具,如性能分析工具JProfiler等。

Spark在业务中的应用情况

现在Spark在我们的业务中已经起到了重要的作用。

我们的流式计算已经完全迁移到了Spark-streaming上,目前Spark-streaming集群上一共运行了15个流式任务,使用了300核和510G内存。

我们的ETL、机器学习等任务也基本上都在使用Spark。下图中分别是近一周,这类任务的数量趋势和vcore-seconds的使用趋势。

spark任务数和vcoreseconds.png


本文叙述了下对Spark概念的一些理解,讨论了下Spark实践中可能踩到的坑,并简单分享了下Spark填坑的一些方法。欢迎大家批评指正。

参考链接

  1. Spark 是否真的比 MapReduce 技高一筹
  2. Spark Misconceptions
  3. Spark Memory Management
  4. Schema.parse is not thread safe
  5. Apache Spark 内存管理详解
  6. Deep Understanding of Spark Memory Management Model
  7. Spark Shuffle 堆外内存溢出问题与解决(Shuffle通信原理)

添加新评论

我们会加密处理您的邮箱保证您的隐私. 标有星号的为必填信息 *