数据空间
您当前的位置: 首页 /数据知识

一个Spark缓存的使用示例

发布时间:[2018-02-09] 来源:网络大数据
点击量:

中国企业数据治理联盟www.chinaedg.com/

进入》主数据管理        企业数据治理      信息资源规划       数据安全管理

 之前一直不是非常理解Spark的缓存应该如何使用. 今天在使用的时候, 为了提高性能, 尝试使用了一下Cache, 并收到了明显的效果。

关于Cache的一些理论介绍, 网上已经很多了. 但是貌似也没有一个简单的例子说明。

注:因为使用的是内部数据文件, 在这边就不公布出来了. 大家看看测试代码跟测试结果即可。

这次测试是在JupyterNotebook这种交互式的环境下测试的. 如果是直接的submit一个job, 可能结果不太一样。

3.jpg

测试步骤                                                   

初始化Spark

1.   from pyspark.sql import SparkSession

2.   spark = SparkSession\

3.   .builder\

4.   .appName("Cache Demo")\

5.   .master("spark://10.206.132.113:7077") \

6.   .config('spark.driver.memory', '5g') \

7.   .config('spark.executor.memory', '5g') \

8.   .config("spark.cores.max", 20) \

9.   .getOrCreate()

分别读两个文件做测试, 并且其中一个使用Cache

1.   ds1 = spark.read.json(os.path.join(data_path, "data.2018-01-04"))

2.   ds2 = spark.read.json(os.path.join(data_path, "data.2018-01-05"))

3.   ds1.cache() # 对于第一个dataframe进行cache.

: 这两个数据文件分别是14日跟15日产生的. 大小非常接近, 都是3.1G.

为了防止Spark自己做了什么Cache影响实验, 在这里读取两个不同的数据文件.

计算时间:

1.   import time

2.   def calc_timing(ds, app_name) :

3.   t1 = time.time()

4.   related = ds.filter("app_name = '%s'" % app_name)

5.   _1stRow = related.first()

6.   t2 = time.time()

7.   print "cost time:", t2 - t1

测试结果:

1.   calc_timing(ds1, "DrUnzip") # cost time: 13.3130679131

2.   calc_timing(ds2, "DrUnzip") # cost time: 18.0472488403

3.   calc_timing(ds1, "DrUnzip") # cost time: 0.868658065796

4.   calc_timing(ds2, "DrUnzip") # cost time: 15.8150720596

可以看到:

·         对于DS1, 虽然调用了Cache ,但是因为没有真正的使用到, 所以第一次进行filter操作还是很慢的

·         第二次使用DS1的时候, 因为有了缓存, 速度快了很多

·         相对的, DS2两次执行时间差别不大

·         如果进到Spark UI 查看具体每个Job的执行时间, 会发现, 只读取数据文件消耗的时间也就在15~20s.

因此可以猜想, SparkDataFrame读取数据之后, 即使进行两个相同的操作, 消耗的时间也不能减少, 因为Spark 默认不会把DS放到内存之中.中翰软件专注数据治理11http://www.jobhand.cn/


发表评论 共有条评论
用户名: 密码:
匿名发表