spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jacob Lynn <abebopare...@gmail.com>
Subject Re: RDD which was checkpointed is not checkpointed
Date Wed, 19 Aug 2020 18:02:20 GMT
Oops, you're right. My incorrect answer above applies only to DataFrames
(2.1+), not RDDs.

Op wo 19 aug. 2020 om 15:49 schreef Ivan Petrov <capacytron@gmail.com>:

> i think it returns Unit... it won't work
> [image: image.png]
>
> I found another way to make it work. Called action after checkpoint
> val recordsRDD = convertToRecords(anotherRDD)
>     recordsRDD.checkpoint()
>     logger.info("checkpoint done")
>     recordsRDD.count() // (!!!)
>     logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>     logger.info(s"recordsRDD.toDebugString:
> \n${recordsRDD.toDebugString}")
>
>     Output:
>     Job$ - checkpoint done (!!!)
>
>     Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
>     Job$ - recordsRDD.toDebugString:
>     (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>
> But still it has single MapPartitionsRDD in lineage. Lineage became
> shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i
> want it to take data directly from checkpoint dir.
> MapPartitionsRDD has non-idempotent id generation. i don't want to call it
> twice in case of downstream task failure
>
>
>
>
> ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <abeboparebop@gmail.com>:
>
>> Hi Ivan,
>>
>> Unlike cache/persist, checkpoint does not operate in-place but requires
>> the result to be assigned to a new variable. In your case:
>>
>> val recordsRDD = convertToRecords(anotherRDD).checkpoint()
>>
>> Best,
>> Jacob
>>
>> Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <capacytron@gmail.com>:
>>
>>> Hi!
>>> Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not
>>> checkpointed...
>>> What do I do wrong?
>>>
>>> val recordsRDD = convertToRecords(anotherRDD)
>>> recordsRDD.checkpoint()
>>> logger.info("checkpoint done")
>>>
>>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>> logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")
>>>
>>> Output:
>>> Job$ - checkpoint done (!!!)
>>>
>>> But then.....
>>> Job$ - isCheckpointed? false, getCheckpointFile: None
>>> Job$ - recordsRDD.toDebugString:
>>> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>>>  |  MapPartitionsRDD[5] at map at ....scala:40 []
>>>  |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
>>>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>>>     |  MapPartitionsRDD[2] at map at ...:66 []
>>>
>>

Mime
View raw message