data nerd
The hidden cost of Spark withColumn
Cross posted from https://manuzhang.github.io/2018/07/11/spark-catalyst-cost.html.
Recently, we’ve been working on machine learning pipeline with Spark, where Spark SQL & DataFrame is used for data preprocessing and MLlib for training. In one use case, the data source is a very wide Hive table of ~1000 columns. The columns are stored in String so we need to cast them to Integer before they can be fed into model training.
This was what I got initially with DataFrame Scala API (2.2.0).
df.columns.foldLeft(df) { case (df, col) => df.withColumn(col, df(col).cast(IntegerType))}
Since DataFrame is immutable, I created a new DataFrame
for each Column
casted using withColumn
. I didn’t think that was be a big deal since at run time all columns would be casted in one shot thanks to Spark’s Catalyst optimizer.
At its core, Catalyst contains a general library for representing trees and applying rules to manipulate them. On top of this framework, we have built libraries specific to relational query processing (e.g., expressions, logical query plans), and several sets of rules that handle different phases of query execution: analysis, logical optimization, physical planning, and code generation to compile parts of queries to Java bytecode
To my surprise, the job stuck in submission for minutes without outputting anything. Luckily, I have a nice colleague Vincent who saved my day with the following fix.
df.select(df.columns.map { col => df(col).cast(IntegerType)}: _*)
He suspected that it’s expensive to call withColumn
for a thousand times. Hence, he dived into its implementation and found out the above in the private method withColumns
called by withColumn
. In his fast version, only one new DataFrame
was created.
I wondered why there was a significant cost difference and looked further into it. After turning on the debug log, I saw a lot of === Result of Batch Resolution ===
s in my slow version. It suddenly struck me that Catalyst’s analysis might not be free. A thousand withColumn
s were actually a thousand times of analysis, which held true for all APIs on DataFrame
. On the other hand, analysis of transform on Column
was actually lazy.
The log led me to org.apache.spark.sql.catalyst.rules.RuleExecutor
where I spotted a timeMap
tracking time running specific rules. What’s more exciting, the statistics was exposed through RuleExecutor.dumpTimeSpent
which I could add to compare the costs in two versions.
df.columns.foldLeft(df) { case (df, col) => println(RuleExecutor.dumpTimeSpent()) df.withColumn(col, df(col).cast(IntegerType))}println(RuleExecutor.dumpTimeSpent()) df.select(df.columns.map { col => println(RuleExecutor.dumpTimeSpent()) df(col).cast(IntegerType)}: _*)println(RuleExecutor.dumpTimeSpent())
As expected, the time spent increased for each DataFrame#withColumn
while that stayed the same for Column#cast
. It would take about 100ms for one round of analysis. (I have a table of time spent for all 56 rules in 100 withColumn
calls in appendix if you are curious).
Summary
- The hidden cost of
withColumn
is Spark Catalyst’s analysis time. - The time spent in Catalyst analysis is usually negligible but it will become an issue when there is a large number of transforms on
DataFrame
. It’s not unusual for a model to have 1000 features which may require preprocessing. - Don’t create a new
DataFrame
for each transform onColumn
. Create one at last withDataFrame#select
喜欢我的文章吗?
别忘了给点支持与赞赏,让我知道创作的路上有你陪伴。
发布评论…