spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ivan Petrov <capacyt...@gmail.com>
Subject Re: RDD which was checkpointed is not checkpointed
Date Wed, 19 Aug 2020 13:49:04 GMT
i think it returns Unit... it won't work
[image: image.png]

I found another way to make it work. Called action after checkpoint
val recordsRDD = convertToRecords(anotherRDD)
    recordsRDD.checkpoint()
    logger.info("checkpoint done")
    recordsRDD.count() // (!!!)
    logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
getCheckpointFile: ${recordsRDD.getCheckpointFile}")
    logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

    Output:
    Job$ - checkpoint done (!!!)

    Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
    Job$ - recordsRDD.toDebugString:
    (2) MapPartitionsRDD[7] at map at  Job.scala:112 []

But still it has single MapPartitionsRDD in lineage. Lineage became
shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i
want it to take data directly from checkpoint dir.
MapPartitionsRDD has non-idempotent id generation. i don't want to call it
twice in case of downstream task failure




ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <abeboparebop@gmail.com>:

> Hi Ivan,
>
> Unlike cache/persist, checkpoint does not operate in-place but requires
> the result to be assigned to a new variable. In your case:
>
> val recordsRDD = convertToRecords(anotherRDD).checkpoint()
>
> Best,
> Jacob
>
> Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <capacytron@gmail.com>:
>
>> Hi!
>> Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not
>> checkpointed...
>> What do I do wrong?
>>
>> val recordsRDD = convertToRecords(anotherRDD)
>> recordsRDD.checkpoint()
>> logger.info("checkpoint done")
>>
>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>> logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")
>>
>> Output:
>> Job$ - checkpoint done (!!!)
>>
>> But then.....
>> Job$ - isCheckpointed? false, getCheckpointFile: None
>> Job$ - recordsRDD.toDebugString:
>> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>>  |  MapPartitionsRDD[5] at map at ....scala:40 []
>>  |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
>>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>>     |  MapPartitionsRDD[2] at map at ...:66 []
>>
>

Mime
View raw message