用ApacheArrow加速PySpark

成玉

用ApacheArrow加速PySpark

Pandas、Numpy是做数据分析最常使用的Python包,如果数据存在Hadoop又想用Pandas做一些数据处理,通常会使用PySpark的DataFrame.toPandas()这个方法。让人不爽的是,这个方法执行很慢,数据量越大越慢。

做个测试

Using Python version 2.7.14 (default, Oct  5 2017 02:28:52)  
SparkSession available as 'spark'.  
>>> def test():
...     from pyspark.sql.functions import rand
...     from better_utils import TimeUtil
...     start = TimeUtil.now_unix()
...     df = spark.range(1 << 22).toDF('id').withColumn("x", rand())
...     df.toPandas()
...     cost = TimeUtil.now_unix() - start
...     print "耗时:{}s".format(cost)
... 
>>> test()
耗时:39s                                                                        
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>> test()
/Users/yulian/anaconda3/envs/python2/lib/python2.7/site-packages/pyarrow/__init__.py:152: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
  warnings.warn("pyarrow.open_stream is deprecated, please use "
耗时:2s                                                                         
>>> 

可以看到,近500w数据的toPandas操作,开启arrow后,粗略耗时统计从39s降低为2s。
如何开启arrow,就是spark.sql.execution.arrow.enabled=true这个配置了,spark2.3开始支持。
另外需要安装pip install pyarrow。

是什么

Arrow是一种跨语言的基于内存的列式数据结构。

在分布式系统内部,每个系统都有自己的内存格式,大量的 CPU 资源被消耗在序列化和反序列化过程中,并且由于每个项目都有自己的实现,没有一个明确的标准,造成各个系统都在重复着复制、转换工作,这种问题在微服务系统架构出现之后更加明显,Arrow 的出现就是为了解决这一问题。作为一个跨平台的数据层,我们可以使用 Arrow 加快大数据分析项目的运行速度。
image.png
需要明确的是,Apache Arrow 不是一个引擎,也不是一个存储系统,它是用来处理分层的列式内存数据的一系列格式和算法。

为什么

PySpark中使用DataFrame.toPandas()将数据从Spark DataFrame转换到Pandas中是非常低效的。
Spark和Python基于Socket通信,使用serializers/deserializers交换数据。
Python的反序列化pyspark.serializers.PickleSerializer使用cPickle模块的标准pickle格式。

Spark先把所有的行汇聚到driver上,然后通过初始转换,以消除Scala和Java之间的任何不兼容性,使用Pyrolite库的org.apache.spark.api.python.SerDeUtil.AutoBatchedPickler去把Java对象序列化成pickle格式。
然后序列化后的数据分批发送个Python的worker子进程,这个子进程会反序列化每一行,拼成一个大list;最后利用 pandas.DataFrame.from_records() 从这个list来创建一个Pandas DataFrame。

上面的过程有两个明显问题:
1)即使使用CPickle,Python的序列化也是一个很慢的过程。
2)利用 from_records 来创建一个 pandas.DataFrame 需要遍历Python list,将每个value转换成Pandas格式。

Arrow可以优化这几个步骤:
1)一旦数据变成了Arrow的内存格式,就不再有序列化的需要,因为Arrow数据可以直接发送到Python进程。
2)当在Python里接收到Arrow数据后,pyarrow可以利用zero-copy技术,一次性的从整片数据来创建 pandas.DataFrame,而不需要轮询去处理每一行记录。另外转换成Arrow数据的过程可以在JVM里并行完成,这样可以显著降低driver的压力。

Arrow有点可以总结为:
* 序列化友好
* 向量化

序列化友好指的是,Arrow提供了一个内存格式,该格式本身是跨应用的,无论你放到哪,都是这个格式,中间如果需要网络传输这个格式,那么也是序列化友好的,只要做下格式调整(不是序列化)就可以将数据发送到另外一个应用里。这样就大大的降低了序列化开销。
向量化指的是,首先Arrow是将数据按block进行传输的,其次是可以对立面的数据按列进行处理的。这样就极大的加快了处理速度。

感兴趣的话可以看下Python各种序列化方案的对比:
http://satoru.rocks/2018/08/fastest-way-to-serialize-array/