Spark 调优
由于大多数Spark计算都在内存中,所以集群中的任何资源(CPU、网络带宽或内存)都可能成为Spark程序的瓶颈。
大多数情况下,如果数据适合存储在内存中,那么瓶颈就是网络带宽,但是有时,你还需要进行一些调优,例如以序列化的形式存储RDD,以减少内存使用。
本指南将涵盖两个主要主题: 数据序列化和内存调优。数据序列化对于良好的网络性能至关重要,它还可以减少内存使用。我们还概述了几个较小的主题。
数据序列化
序列化在任何分布式应用程序的性能中都扮演着重要的角色。将对象序列化成或消耗大量字节的速度较慢的格式将极大地降低计算速度。通常,这是优化Spark应用程序的第一件事。Spark的目标是在便利性(允许你在操作中使用任何Java类型)和性能之间取得平衡。它提供了两个序列化库:
-
Java序列化: 默认情况下,使用Java的 Spark 序列化对象
ObjectOutputStream
框架,你可以使用任何实现了 java.io.Serializable 的类与它一起使用。你还可以通过继承 java.io.Externalizable 来更深入地控制序列化的性能。Java序列化有着灵活,但通常非常缓慢的特性,会导致许多类的大型序列化格式。 -
Kryo serialization: Spark还可以使用Kryo库(
version 4
)更快地序列化对象。Kryo比Java序列化要快得多(通常多达10倍),也更紧凑,但它不支持所有的Serializable
类型,并且要求你预先 注册 将在程序中使用的类,以获得最佳性能。
你可以通过使用 SparkConf 初始化作业并调用 conf.set("spark.serializer", org.apache.spark.serializer.KryoSerializer")
,切换使用 Kryo
来进行序列化。此设置配置的序列化程序不仅用于在工作节点之间数据 shuffle
,而且还用于将 RDD
序列化到磁盘。Kryo不是默认的 serializer
的唯一原因是自定义的注册要求,但是我们建议在任何网络密集型应用程序中尝试它。自从 Spark 2.0.0 以来,我们在使用简单类型、简单类型数组或字符串类型 RDD 之间 shuffle
时,在内部使用 Kryo serializer
。
Spark 默认包含 Kryo
序列化器,用于 Twitter chill 库中 AllScalaRegistrar
涉及的许多常用的Scala核心类。
要向Kryo注册你自己的自定义类,请使用 registerKryoClasses
方法。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
Kryo文档 描述了更高级的注册选项,如添加自定义序列化代码。
如果你的对象很大,你可能还需要增加 spark.kryoserializer.buffer
config。这个值需要足够大,以容纳你要序列化的最大对象。
最后,如果你不注册自定义类,Kryo仍然可以工作,但是它必须将完整的类名存储在每个对象中,这是很浪费资源的。
内存调优
在内存使用时进行调优,有三个考虑因素:
内存大小
: 对象使用的内存 大小 (你可能希望整个dataset
都能在内存中)成本
: 访问对象的成本开销
:gc
开销(如果对象的周转率很高)。
默认情况下,Java对象的访问速度很快,但是很容易比字段中的 “原始(raw)” 数据多消耗2-5倍的内存空间。这是由于以下几个原因造成的:
-
每个不同的 Java 对象都有一个"对象头",它大约有16个字节,包含指向其类的指针等信息。对于一个只有很少数据的对象(比如一个
Int
字段),它可比数据大。 -
Java
String
在原始字符串数据上大约有40个字节的开销(因为他们将其存储在Char
数组中,并保留额外的数据,如长度),由于String
内部使用 UTF-16 编码,因此将每个字符存储为2字节
。因此,一个10个字符的字符串可以轻松地消耗60字节空间。 - 常见的集合类,如
HashMap
和LinkedList
,使用链接数据结构,其中每个元素都有一个 "wrapper" 对象(例如“Map.Entry”)。这个对象不仅有一个头,而且还有指向列表中下一个对象的地址指针(通常每个地址指针有8
个字节)。 - 原始类型的集合通常将它们存储为
boxed
(装箱) 对象,如java.lang.Integer
本节将首先概述Spark中的内存管理,然后讨论用户可以采取的具体策略,以便在他/她的应用程序中更有效地使用内存。特别地,我们将描述如何确定对象的内存使用情况,以及如何改进它——通过更改数据结构或以序列化格式存储数据。然后,我们将讨论调整Spark的 cache size
和Java gc
。
内存管理概论
Spark中的内存使用主要分为两类: 执行和存储。
- 执行内存: 是指用于
shuffle
、join
、sort
和aggregation
的计算 - 存储内存: 是指用于在集群中
cache
和传播内部数据的内存。
在Spark中,执行和存储共享一个统一的区域(M
),当没有执行内存时,存储可以获取所有可用的内存,反之亦然。如果有必要,执行可能会驱逐存储,但仅在总存储内存使用量低于某个阈值(R
)时才会这样做。换句话说,R
描述了 M
中的一个子区域,其中缓存的块永远不会被驱逐。存储可能不会因为执行的复杂性而驱逐执行。
这种设计确保了几个理想的性能。首先,不使用缓存的应用程序可以将整个空间用于执行,从而避免不必要的磁盘溢出。其次,使用缓存的应用程序可以保留最小的存储空间(R),在这里它们的数据块不会被清除。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而不需要用户了解如何在内部划分内存。
虽然有两种相关的配置,但是典型的用户不应该需要调整它们,因为默认值适用于大多数工作负载:
spark.memory.fraction
表示M
的大小是(JVMheap space
- 300MB)的一部分(默认值0.6)。剩下的空间(40%)用于用户数据结构、Spark中的内部元数据,以及在稀疏和异常大的记录情况下防止OOM
错误。-
spark.memory.storageFraction
将R
的大小表示为M
的一部分(默认值为0.5)。R
是M
中的存储空间,缓存的块不会被执行清除。 -
应该设置
spark.memory.fraction
的值,以便在JVM的老年代(old)
或永久代(tenured)
中合适地容纳这部分堆空间。有关详细信息,请参阅下面关于 高级 GC调优的讨论。
确定内存消耗
确定一个 dataset
所需的内存消耗的最好方法是创建一个RDD,将它放到缓存中,然后在web UI中查看 “Storage” 页面。该页将告诉你RDD占用了多少内存。
要估算特定对象的内存消耗,请使用 SizeEstimator
的 estimate
方法。这对于尝试使用不同的数据布局来调整内存使用,以及确定广播变量在每个执行器堆上占用的空间量非常有用。
优化数据结构
减少内存消耗的第一种方法是避免使用增加开销的Java特性,比如 pointer-based
的数据结构和 wrapper
对象。
有几种方法可以做到这一点:
-
设计你自己的数据结构以选择对象数组和基本类型,而不是标准的
Java
或Scala
集合类(e.g.HashMap
)。fastutil 库为与Java
标准 library 兼容的基本类型提供了方便的集合类。 -
尽可能避免嵌套有大量小对象和指针的结构。
-
考虑使用数字id或枚举对象代替
key
的字符串。 -
如果你的内存不足32 GB,那么可以设置 JVM参数
-XX:+UseCompressedOops
,使地址指针变成4个字节,而不是8个字节。你可以在spark-env.sh
中添加这些选项。
序列化 RDD 存储
当对象仍过于庞大,尽管进行了上述调优,仍无法有效地进行存储。 减少内存使用一个更简单的方法是将它们用 serialized
形式进行存储。 在 RDD 持久 API 使用序列化 StorageLevels, 如 MEMORY_ONLY_SER
。Spark 然后将每个RDD分区存储为一个大字节数组。以序列化形式存储数据的惟一缺点是访问时间较慢,因为必须动态地反序列化每个对象。如果你希望以序列化的形式缓存数据,我们强烈建议使用Kryo,因为它比Java序列化(当然也比原始Java对象)的大小小得多。
GC优化
当你的程序存储了大量的 RDD
数据时, JVM GC
可能会成为一个问题。(对于那些只读取一次RDD,然后在上面运行许多操作的程序来说,这通常不是问题。)当Java需要清除旧对象来为新对象腾出空间时,它将需要跟踪所有Java对象并找到未使用的对象。这里要记住的要点是,GC成本与Java对象的数量成比例,因此使用对象较少的数据结构(例如,一个 Int
数组而不是 LinkedList
数组)可以大大降低这一成本。一个更好的方法是以序列化的形式持久化对象,如上所述:现在每个RDD分区将只有 一个* 对象(一个字节数组)。在尝试其他技术之前,如果GC有问题,首先要尝试的是使用序列化缓存。
GC也可能是一个问题,因为任务的工作内存(运行任务所需的空间量)和节点上缓存的 RDD
之间存在干扰。我们将讨论如何控制分配给RDD缓存的空间来缓解这种情况。
测量GC的影响
GC调优的第一步是收集关于 GC发生的频率和GC花费的时间的统计信息。这可以通过在Java 参数中添加 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
来实现。(请参阅配置指南以获取关于将Java参数传递给Spark作业的信息。)下一次运行Spark作业时,你将看到每次发生垃圾收集时在 Worker
日志中打印的消息。注意,这些日志将在你的集群Worker 节点
上(在它们的工作目录中的 stdout
文件中),而不是在你的 Driver 进程上。
高级GC调优
为了进一步调优 GC,我们首先需要了解JVM中关于内存管理的一些基本信息:
- Java堆空间分为两个区域,年轻代(
Young
) 和老年代(Old
) 。年轻代的目的是持有生命周期较短(short-object)的对象,而老年代的目的是持有具有更长的生存期的对象。 - 年轻一代被进一步划分为三个区域[
Eden
,Survivor1
,Survivor2
]。 - GC 过程的简化描述:
- 当Eden已满时,在Eden上运行一个小型GC,从Eden和Survivor1存活的对象被复制到Survivor2。交换
Survivor
区域。 - 如果一个对象足够 Old(在多次GC后还存活) 或Survivor2已满,则将其移动到老年代中。
- 最后,当老年代接近full时,将调用full GC。
在Spark中进行GC调优的目标是确保在老年代中只存储长期存在的 RDD
,而年轻代的大小足以存储短期存在的对象。这将有助于避免在任务执行期间发生 full gc
来收集创建的临时对象。一些可能有用的步骤是:
- 通过收集GC统计信息来检查是否有太多的
GC
发生。如果在任务完成之前多次调用full GC
,这意味着没有足够的可用内存来执行任务。 - 如果
Minor GC
太多而Major GC
太少,那么为Eden分配更多的内存会有所帮助。你可以将Eden的大小设置为高估 (over-estimate) 每个task
所需的内存大小。如果Eden
的大小被确定为E
,那么你可以使用选项-Xmn=4/3*E
来设置年轻代的大小。(4/3的比例也考虑到了Survivor
区域使用的空间, 一个Survivor
占年轻代 1/8 空间) - 在打印的GC统计中,如果老年代空间接近满,通过降低
spark.memory.fraction
来减少用于缓存的内存数量; 缓存更少的对象比降低任务执行速度要好。或者,考虑减少年轻代的规模。这意味着降低-Xmn
,如果你已经设置为上述。如果没有,请尝试更改JVM的NewRatio
参数的值。许多 JVM 将其默认为2,这意味着老一代占用了2/3的堆。它应该足够大,使这个fraction
超过spark.memory.fraction
。 - 尝试使用JVM参数
-XX:+UseG1GC
使用的G1GC
垃圾收集器。在垃圾收集成为瓶颈的某些情况下,它可以提高性能。注意, 对于Executor
堆大小容量大的集群, 用-XX:G1HeapRegionSize
参数增加 G1 区域尺寸 会很重要。 - 例如,如果你的任务正在从HDFS读取数据,则可以使用从HDFS读取的数据块的大小来估计任务所使用的内存量。 注意,解压缩块的大小通常是块大小的2到3倍。因此,如果我们希望有3或4个任务的工作空间,并且
HDFS
块大小为128MB,我们可以估计Eden的大小为4*3*128MB
。 - 监视随新设置的变化, GC 所花费的频率和时间。
我们的经验表明,GC调优的效果取决于你的应用程序和可用的内存量。官网描述了更多的调优选项,但是在高层次上,管理 full GC
发生的频率有助于减少开销。
可以通过在 Job 配置中,设置 spark.executor.extraJavaOptions
来指定 Executor
的GC调优等级。
其它考虑
并行级别
除非将每个 operation
的并行度设置得足够高,否则集群资源不会得到充分利用。Spark根据每个文件的大小自动设置要在每个文件上运行的“map” 任务的数量(尽管你可以通过 SparkContext.textFile
的可选参数来控制它)。对于分布式的“reduce”操作,例如 groupByKey
和 reduceByKey
,它使用最大的父RDD的分区数。你可以将并行度级别作为第二个参数传递(参见 spark.PairRDDFunction 文件),或者设置配置属性spark.default.parallelism
来更改默认值。通常,我们建议在你的集群中每个CPU内核执行2-3个任务。
Reduce任务内存使用
有时,你会得到一个OutOfMemoryError错误,不是因为你的RDD 内存不合适,而是因为你的一个任务的工作集,例如 groupByKey
中的一个 reduce
任务,占用内存太多。Spark的 shuffle
操作( sortByKey
、groupByKey
、reduceByKey
、join
等)在每个任务中构建一个 hash table
来执行分组,分组常常很大。这里最简单的修复方法是增加并行度,这样每个任务的输入集就会更小。Spark可以有效地支持短至200 ms的任务,因为它跨多个任务重用一个 executor JVM
,而且它的任务启动成本很低,所以可以安全地将并行度提高到比集群中的内核数量更多的水平。
广播大变量
使用SparkContext中可用的广播功能可以极大地减少每个序列化任务的大小,以及通过集群启动 job 的成本。如果你的任务使用了 Driver
进程中的任何大对象(例如,一个 static 查找表),请考虑将其转换为广播变量。Spark在 Master
上记录了每个任务的序列化大小,因此可以查看它来决定你的任务是否太大;一般情况下,大于20kb的任务可能值得优化。
数据局部性
数据局部性对Spark作业的性能有很大的影响。如果数据和对其进行操作的代码在一起,那么计算往往会很快。但是,如果代码和数据是分开的,那么其中一个必须转移到另一个。通常,将序列化的代码从一个地方传送到另一个地方要比传送数据块快得多,因为代码的大小比数据小得多。Spark根据数据局部性的一般原则构建其调度。
数据局部性是指数据与处理数据的代码之间的距离。基于数据的当前位置有几个级别的局部性。按从最近到最远的顺序排列:
-
PROCESS_LOCAL
: 数据与运行代码位于相同的JVM
中。这是最好的位置特性。 -
NODE_LOCAL
: 数据在同一个节点上。数据可能在同一节点上的HDFS
中,也可能在同一节点上的另一个Executor
中。这比PROCESS_LOCAL
稍微慢一些,因为数据必须在进程之间传递 -
NO_PREF
: 数据从任何地方访问, 速度都一样快,并且没有区域性优先特性 -
RACK_LOCAL
: 数据位于相同的服务器机架上。数据位于同一机架上的不同服务器上,因此需要通过网络发送,通常是通过单个交换机 -
Any
: 数据都在网络的其他地方,不在同一个机架上
Spark倾向于将所有任务安排在最佳位置级别,但这并不总是可能的。在任何空闲执行器上都没有未处理的数据的情况下,Spark切换到较低的位置级别。有两种选择:
a)等待繁忙的CPU释放出来,在同一服务器上的数据上启动一个任务,或者
b)立即在需要移动数据的较远的地方启动一个新任务。
Spark通常做的是稍作等待,希望繁忙的CPU可以释放。一旦超时过期,它就开始将数据从远处移动到空闲的CPU。每个级别之间回退的等待超时可以单独配置,也可以全部配置在一个参数中; 有关详细信息,见 配置页面 spark.locality
参数 。如果你的任务很长,并且位置不好,你应该增加这些设置,但是默认设置通常工作得很好。
总结
这是一个简短的指南,指出了你在调优Spark应用程序时应该了解的主要问题——最重要的是数据序列化和内存调优。对于大多数程序,切换到Kryo序列化并以序列化的形式持久化数据将解决最常见的性能问题。你可以在Spark邮件列表中 询问其他调优最佳实践。