景派HPC研究院丨oneAPI并行版Modin与Pandas对比测试
景派HPC研究院
由景派科技的技术团队组成,主要分享高性能计算领域的技术信息,与解答各位朋友在技术上遇到的问题和瓶颈,欢迎大家在文章下方的留言区讨论与提问。
1 Pandas VS Intel Distribution of Modin
1.1 Pandas 简介
Pandas 是什么?Pandas 是一个强大的分析结构化数据的工具集;它的使用基础是 Numpy(提供高性能的矩阵运算);用于数据挖掘和数据分析,同时也提供数据清洗功能[1]。
1.2 Pandas 分布式计算现状
原生 Python 在多线程处理方面比较弱,不能充分利用多核 CPU 的计算能力。同样,原生 Pandas 功能很强大,但不支持分布式计算,因此难以处理大量数据。目前的解决方案是Pandas on Ray
和Pandas on Dask
,能够在尽量不改变 Pandas 的 API 情况下实现分布式内存计算[2]。相比于目前很火的分布式内存计算框架 Spark,前者是基于 Python 生态,而 Spark 是基于 Hadoop 生态[3-5]。
1.3 Intel Distribution of Modin 简介
这是 Intel 提供的一个分布式 DataFrame 库,可以使用与 Pandas 相同的 API 跨多个节点进行数据预处理。该库在后端与OmniSci集成,用于加速分析。而Modin原本是一个开源项目,基于 Python 下的分布式计算框架Dask
和Ray
对 Pandas 进行并行化加速。Intel Distribution of Modin 就是基于开源版的 Modin 优化而来,对 Ray 和 Dask 均有优化[6]。
2 安装方法
2.1 Pandas
官方版 Pandas 安装是非常方便的,可以直接使用conda
来安装:
conda install pandas
或者使用 pip 来安装:
pip install pandas
2.2 Intel Distribution of Modin
参考Intel Software 官网的安装文档, Intel Distribution of Modin
并不包含在Intel® AI Analytics Toolkit
安装程序中,需要用APT Package Manager
、Conda Package Manager
、Docker Hub
、YUM Package Manager
或者Zypper Package Manager
来获取安装。本文使用Anaconda
来安装,只需要使用如下命令即可:
conda create -n aikit-modin intel-aikit-modin -c intel -c conda-forge
使用上述命令回新建一个叫做aikit-modin
的 Anaconda 虚拟环境,为了使用这个环境中的 Modin,我们只需要激活他即可:
conda activate aikit-modin
3 Pandas VS Intel Distribution of Modin[Dask]
根据 Modin 官方 GitHub 仓库主页,其对 Pandas API 的支持情况如下图所示:
3.1 导包构建环境
import os
os.environ["MODIN_ENGINE"] = "dask"# 设置Modin底层使用Dask框架进行并行计算
import pandas as pd# 导入官方版pandas
import modin.pandas as mpd# 导入Modin版pandas
import numpy as np
import time
import matplotlib.pyplot as plt
#导入中文字体,避免显示乱码
plt.rcParams['font.family'] = 'WenQuanYi Micro Hei'# 适用于CentOS下的
plt.rcParams['axes.unicode_minus'] = False
3.2 读入.csv
文件速度对比
Modin 基本覆盖了 Pandas 所有经常用到的读入数据的 API,其中pd.read_csv
是最最常用的,因此本文将以此 API 为例进行对比。如下代码将会分别使用 Pandas 和 Modin 读入 9GB 的移动基站流量数据,对比两者的耗时,并且以图的形式展示。
t = time.time()
pd_df = pd.read_csv('/home/caibo/jupyter_proj/data/mathor_cup/train1.csv',
encoding='gbk')
pd_time = time.time() - t
print('Pandas times : {} s'.format(pd_time))
t = time.time()
md_df = mpd.read_csv('/home/caibo/jupyter_proj/data/mathor_cup/train2.csv',
encoding='gbk')
mpd_time = time.time() - t
print('Modin times: {} s'.format(mpd_time))
plt.bar(['Pandas', 'Modin'], [pd_time, mpd_time])
plt.ylabel('Times(s)')
plt.title('pd.read_csv数据读入速度对比')
plt.show()
Pandas times : 181.7861328125 s
UserWarning: Dask execution environment not yet initialized. Initializing...
To remove this warning, run the following python code before doing dataframe operations:
from distributed import Client
client = Client()
UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 44508 instead
Modin times: 46.26690101623535 s
结果可知,读取同样的文件,使用基于 Dask 框架的 Modin 的速度是 Pandas 的392.91%
,加速效果非常明显!
3.3 DataFrame
基本操作速度对比
在 Pandas 数据分析和处理中,对 DataFrame 的操作是非常多的,因此如果将 DataFrame 的常用操作并行化,将会极大提高数据分析和处理的效率。接下来我们会对比测试基于 Dask 框架的 Modin 和 Pandas 调用 DataFrame API 进行数据分析和处理的速度。Pandas.concat()
、DataFrame.apply()
和DataFrame.describe()
这三个 API 的使用非常频繁而且易于并行化实现,因此本文将会对比 Modin 和 Pandas 进行这两个操作的耗时。
下面的带代码使用 Modin 和 Pandas 分别合并 DataFrame,并且对比两者的耗时。
t = time.time()
pd.concat([pd_df, pd_df], axis=0)
t_pd_info = time.time() - t
print('Pandas times : {} s'.format(t_pd_info))
t = time.time()
mpd.concat([md_df, md_df], axis=0)
t_mpd_info = time.time() - t
print('Modin times: {} s'.format(t_mpd_info))
plt.bar(['Pandas', 'Modin'], [t_pd_info, t_mpd_info])
plt.ylabel('Times(s)')
plt.show()
Pandas times : 23.865532159805298 s
Modin times: 3.5132291316986084 s
下面的带代码使用 Modin 和 Pandas 分别使用DataFrame.apply()
对数据的日期
、时间
字段进行修正,并且对比两者的耗时。
def time_correct(df):
# 时间修正
# 将0x:00:00 的时间修正为 x:00:00
# 将xx:00:0 的时间修正为 xx:00:00
# df['时间']=clock_correct(df)
return df['时间'].apply(
lambda x: '0' + str(int(x[0:x.index(":")])) + ':00:00'if int(x[
0:x.index(":")]) < 10else str(int(x[0:x.index(":")])) + ':00:00')
def date_correct(df):
# 日期修正
# 将0xx-0x-0x 的时间修正为 20xx/x/x
# df['日期']=clock_correct(df)
return df['日期'].apply(lambda x: str(
str(2000 + int(x[0:x.index("-")])) + '/' + str(
int(x[4:x.index('-', 4)])) + '/' + str(int(x[7:])))
if x.count('-') > 0else x)
t = time.time()
pd_df['日期'] = date_correct(pd_df)
pd_df['时间'] = time_correct(pd_df)
t_pd_info = time.time() - t
print('Pandas times : {} s'.format(t_pd_info))
t = time.time()
md_df['日期'] = date_correct(md_df)
md_df['时间'] = time_correct(md_df)
t_mpd_info = time.time() - t
print('Modin times: {} s'.format(t_mpd_info))
plt.bar(['Pandas', 'Modin'], [t_pd_info, t_mpd_info])
plt.ylabel('Times(s)')
plt.show()
Pandas times : 370.6549892425537 s
Modin times: 0.29850006103515625 s
此测试种 Modin 相比 Pandas 执行速度快了非常多,有效性有待验证。
下面的带代码分别使用 Modin 和 Panda 的 DataFrame.mean() API 获取 DataFrame 的统计信息,并且对比两者的耗时。
t = time.time()
pd_df['上行业务量GB'].mean()
t_pd_info = time.time() - t
print('Pandas times : {} s'.format(t_pd_info))
t = time.time()
md_df['上行业务量GB'].mean()
t_mpd_info = time.time() - t
print('Modin times: {} s'.format(t_mpd_info))
plt.bar(['Pandas', 'Modin'], [t_pd_info, t_mpd_info])
plt.ylabel('Times(s)')
plt.show()
Pandas times : 4.411945343017578 s
Modin times: 100.35079002380371 s
可以看到对于这个接口,Modin 并没有 Pandas 快,慢了非常多;此外还发现 Modin 运行过程中内存占用非常大,执行再复杂的操作如DataFrame.describe()
时会直接占满内存导致异常中断,而原生 Pandas API 并不会。
4 测试小结
基于 Dask 框架的 Intel Distribution of Modin 可以实现 Pandas 大部分 API 的并行化,比如数据读入、一些 DataFrame 和 Series 的基本操作。对于那些易于并行化实现的 API,Modin 基本上都覆盖了,而那些本不可以并行化实现的 API 依旧调用自 Pandas。部分 API 使用 Modin 可以显著提高处理速度,特别是DataFrame.apply()
;但是有一些操作不仅不能提速,反而没有 Pandas 原生 API 速度快,造成负优化。此外由于 Modin 还不够完善,在使用过程中调用 M 某些 API 时会出现内存占用过高、进程无端中断的问题。以上测试均是在单机环境下进行的,集群环境下的使用效果有待检验。
参考资料
[1] Pandas官网:https://pandas.pydata.org/
[2] Modin官网:https://github.com/modin-project/modin
[3] Spark vs Dask Python生态下的计算引擎:https://cloud.tencent.com/developer/article/1702051
[4] python 链式计算框架_Python的分布式计算框架——Dask调度器简介:https://blog.csdn.net/weixin_39653766/article/details/111668770
[5] Ray 分布式计算框架介绍:https://zhuanlan.zhihu.com/p/111340572
[6] Intel® oneAPI AI Analytics Toolkit:https://software.intel.com/cont