Spark RDD 3 --- 持久化

介绍RDD持久化的相关概念和配置

Posted by Cheney.Yin on December 21, 2019

RDD持久化

Spark中一个很重要的能力是将数据持久化(或称为缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个RDD时,每个节点的其它分区都可以访问这些持久化的数据,在该数据上的其它action操作将直接使用内存中的数据。这样会让以后的action操作计算速度加快(通常运行速度会加速10倍)。缓存是迭代算法和快速的交互式使用的重要工具。

RDD可以使用persist()方法或cache()方法进行持久化。数据将会在第一次action操作时进行计算,并缓存在节点的内存中。Spark的缓存具有容错机制,如果一个缓存的RDD的某个分区丢失了,Spark将按照原来的计算过程,自动重新计算并进行缓存。

另外,每个持久化的RDD可以使用不同的存储级别进行缓存,例如,持久化到磁盘、已序列化的Java对象形式持久化到内存(可以节省空间)、跨节点间复制、以off-heap方式存储在Tachyon(利用 JDK Unsafe API,从 Spark 2.0 开始,在管理堆外的存储内存时不再基于 Tachyon,而是与堆外的执行内存一样,基于 JDK Unsafe API 实现)。这些存储级别通过传递一个StorageLevel对象给persist()方法进行设置。cache()方法是使用默认存储级别的快捷设置方法,默认存储级别是StorageLevel.MEMORY_ONLY(将反序列化的对象存储到内存中。)详细的存储级别如下:

  • MEMORY_ONLY:(默认级别)将RDD以反序列化Java对象的形式存储在JVM中。如果内存空间不足,部分数据分区将不再缓存,在每次需要用到这些数据时重新计算。

  • MEMORY_AND_DISK:将RDD以反序列化Java对象的形式存储在JVM中。如果内存不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时重磁盘读取。

  • MEMORY_ONLY_SER:将RDD以序列化的Java对象形式存储到内存。(每个分区为一个byte数组)。这种方式会比反序列化对象的方式节省很多空间,尤其时在使用fast serializer时会节省更多空间,但是在读取时会增加CPU计算负载。

  • MEMORY_AND_DISK_SER:类似于MEMORY_ONLY_SER,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。

  • DISK_ONLY:只在磁盘上缓存RDD。

  • MEMORY_ONLY_2MEMORY_AND_DISK_2,等等:同上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。

  • OFF_HEAP:类似于MEMORY_ONLY_SER,但是将数据存储在off-heap memory,这需要启动off-heap内存。

注意,python缓存对象总是使用Pickle进行序列化,所以在Python中不关心你选择的是哪一种序列化级别。python中存储级别包括MEMORY_ONLYMEMORY_ONLY_2MEMORY_AND_DISKMEMORY_AND_DISK_2DISK_ONLYDISK_ONLY_2。 在shuffle操作中,即便是用户没有调用persist方法,Spark也会自动缓存部分中间数据。这么做的目的是在shuffle的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个RDD,强烈推荐在该RDD上调用persist方法。

如何选择存储级别

Spark存储级别的选择,核心问题是在内存使用率CPU效率之间进行权衡。建议按下面的过程进行存储级别的选择:

  • 如果使用默认的存储级别(MEMORY_ONLY),存储在内存中RDD没有发生溢出,那么就选择默认的存储级别。默认存储级别可以最大限度的提高CPU的使用效率,可以使在RDD上的操作以最快的速度运行。

  • 如果内存不能全部存储RDD,那么使用MEMORY_ONLY_SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快。

  • 除了在计算该数据集的代价特别高,或者在需要过滤大量数据的情况下,尽量不要将溢出的数据存储到磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。

  • 如果想快速还原故障,建议使用多副本存储级别(例如,使用Spark作为web应用的后台服务,在服务出故障时需要快速恢复的场景下)。所有的存储级别都通过重新计算丢失的数据方式,提供了完全的容错机制。但是多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务计算继续运行。

删除数据

Spark自动监控各个节点上的缓存使用率,并以最近最少使用的方式(LRU)将旧数据块移除内存。如果想手动移除一个RDD,而不是等待该RDD被Spark自动移除,可以使用RDD.unpersist()方法。