Quantcast

How to checkpoint and RDD after a stage and before reaching an action?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

How to checkpoint and RDD after a stage and before reaching an action?

leo9r
Hi,

I have a 1-action job (saveAsObjectFile at the end), that includes several stages. One of those stages is an expensive join "rdd1.join(rdd2)". I would like to checkpoint rdd1 right before the join to improve the stability of the job. However, what I'm seeing is that the job gets executed all the way to the end (saveAsObjectFile) without doing any checkpointing, and then re-runing the computation to checkpoint rdd1 (when I see the files saved to the checkpoint directory). I have no issue with recomputing, given that I'm not caching rdd1, but the fact that the checkpointing of rdd1 happens after the join brings no benefit because the whole DAG is executed in one piece and the job fails. If that is actually what is happening, what would be the best approach to solve this?
What I'm currently doing is to manually save rdd1 to HDFS right after the filter in line (4) and then load it back right before the join in line (11). That prevents the job from failing by splitting it into 2 jobs (ie. 2 actions). My expectations was that rdd1.checkpoint in line (8) was going to have the same effect but without the hassle of manually saving and loading intermediate files.

///////////////////////////////////////////////

(1)   val rdd1 = loadData1
(2)     .map
(3)     .groupByKey
(4)     .filter
(5)
(6)   val rdd2 = loadData2
(7)
(8)   rdd1.checkpoint()
(9)
(10)  rdd1
(11)    .join(rdd2)
(12)    .saveAsObjectFile(...)

/////////////////////////////////////////////

Thanks in advance,
Leo
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: How to checkpoint and RDD after a stage and before reaching an action?

Liang-Chi Hsieh

Hi Leo,

The checkpointing of a RDD will be performed after a job using this RDD has completed. Since you have only one job, rdd1 will only be checkpointed after it is finished.

To checkpoint rdd1, you can simply materialize (and maybe cache it to avoid recomputation) rdd1 (e.g., rdd1.count) after calling rdd1.checkpoint().


leo9r wrote
Hi,

I have a 1-action job (saveAsObjectFile at the end), that includes several stages. One of those stages is an expensive join "rdd1.join(rdd2)". I would like to checkpoint rdd1 right before the join to improve the stability of the job. However, what I'm seeing is that the job gets executed all the way to the end (saveAsObjectFile) without doing any checkpointing, and then re-runing the computation to checkpoint rdd1 (when I see the files saved to the checkpoint directory). I have no issue with recomputing, given that I'm not caching rdd1, but the fact that the checkpointing of rdd1 happens after the join brings no benefit because the whole DAG is executed in one piece and the job fails. If that is actually what is happening, what would be the best approach to solve this?
What I'm currently doing is to manually save rdd1 to HDFS right after the filter in line (4) and then load it back right before the join in line (11). That prevents the job from failing by splitting it into 2 jobs (ie. 2 actions). My expectations was that rdd1.checkpoint in line (8) was going to have the same effect but without the hassle of manually saving and loading intermediate files.

///////////////////////////////////////////////

(1)   val rdd1 = loadData1
(2)     .map
(3)     .groupByKey
(4)     .filter
(5)
(6)   val rdd2 = loadData2
(7)
(8)   rdd1.checkpoint()
(9)
(10)  rdd1
(11)    .join(rdd2)
(12)    .saveAsObjectFile(...)

/////////////////////////////////////////////

Thanks in advance,
Leo
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
Loading...