RDD which was checkpointed is not checkpointed

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

RDD which was checkpointed is not checkpointed

Ivan Petrov
Hi!
Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not checkpointed...
What do I do wrong?

val recordsRDD = convertToRecords(anotherRDD)
recordsRDD.checkpoint()
logger.info("checkpoint done")

logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, getCheckpointFile: ${recordsRDD.getCheckpointFile}")
logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

Output:
Job$ - checkpoint done (!!!)

But then.....
Job$ - isCheckpointed? false, getCheckpointFile: None
Job$ - recordsRDD.toDebugString:
(2) MapPartitionsRDD[7] at map at  Job.scala:112 []
 |  MapPartitionsRDD[6] at map at  Job.scala:111 []
 |  MapPartitionsRDD[5] at map at ....scala:40 []
 |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
 +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
    |  MapPartitionsRDD[2] at map at ...:66 []
Reply | Threaded
Open this post in threaded view
|

Re: RDD which was checkpointed is not checkpointed

abeboparebop
Hi Ivan,

Unlike cache/persist, checkpoint does not operate in-place but requires the result to be assigned to a new variable. In your case:

val recordsRDD = convertToRecords(anotherRDD).checkpoint()

Best,
Jacob

Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <[hidden email]>:
Hi!
Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not checkpointed...
What do I do wrong?

val recordsRDD = convertToRecords(anotherRDD)
recordsRDD.checkpoint()
logger.info("checkpoint done")

logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, getCheckpointFile: ${recordsRDD.getCheckpointFile}")
logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

Output:
Job$ - checkpoint done (!!!)

But then.....
Job$ - isCheckpointed? false, getCheckpointFile: None
Job$ - recordsRDD.toDebugString:
(2) MapPartitionsRDD[7] at map at  Job.scala:112 []
 |  MapPartitionsRDD[6] at map at  Job.scala:111 []
 |  MapPartitionsRDD[5] at map at ....scala:40 []
 |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
 +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
    |  MapPartitionsRDD[2] at map at ...:66 []
Reply | Threaded
Open this post in threaded view
|

Re: RDD which was checkpointed is not checkpointed

Ivan Petrov
i think it returns Unit... it won't work
image.png

I found another way to make it work. Called action after checkpoint
val recordsRDD = convertToRecords(anotherRDD)
    recordsRDD.checkpoint()
    logger.info("checkpoint done")
    recordsRDD.count() // (!!!)
    logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, getCheckpointFile: ${recordsRDD.getCheckpointFile}")
    logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

    Output:
    Job$ - checkpoint done (!!!)

    Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
    Job$ - recordsRDD.toDebugString:
    (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
    
But still it has single MapPartitionsRDD in lineage. Lineage became shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i want it to take data directly from checkpoint dir.
MapPartitionsRDD has non-idempotent id generation. i don't want to call it twice in case of downstream task failure




ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <[hidden email]>:
Hi Ivan,

Unlike cache/persist, checkpoint does not operate in-place but requires the result to be assigned to a new variable. In your case:

val recordsRDD = convertToRecords(anotherRDD).checkpoint()

Best,
Jacob

Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <[hidden email]>:
Hi!
Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not checkpointed...
What do I do wrong?

val recordsRDD = convertToRecords(anotherRDD)
recordsRDD.checkpoint()
logger.info("checkpoint done")

logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, getCheckpointFile: ${recordsRDD.getCheckpointFile}")
logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

Output:
Job$ - checkpoint done (!!!)

But then.....
Job$ - isCheckpointed? false, getCheckpointFile: None
Job$ - recordsRDD.toDebugString:
(2) MapPartitionsRDD[7] at map at  Job.scala:112 []
 |  MapPartitionsRDD[6] at map at  Job.scala:111 []
 |  MapPartitionsRDD[5] at map at ....scala:40 []
 |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
 +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
    |  MapPartitionsRDD[2] at map at ...:66 []
Reply | Threaded
Open this post in threaded view
|

Re: RDD which was checkpointed is not checkpointed

Russell Spitzer
Checkpoint is lazy and needs an action to actually do the work. The method just marks the rdd as noted in the doc you posted. 

Call an action twice. The second run should use the checkpoint.



On Wed, Aug 19, 2020, 8:49 AM Ivan Petrov <[hidden email]> wrote:
i think it returns Unit... it won't work
image.png

I found another way to make it work. Called action after checkpoint
val recordsRDD = convertToRecords(anotherRDD)
    recordsRDD.checkpoint()
    logger.info("checkpoint done")
    recordsRDD.count() // (!!!)
    logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, getCheckpointFile: ${recordsRDD.getCheckpointFile}")
    logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

    Output:
    Job$ - checkpoint done (!!!)

    Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
    Job$ - recordsRDD.toDebugString:
    (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
    
But still it has single MapPartitionsRDD in lineage. Lineage became shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i want it to take data directly from checkpoint dir.
MapPartitionsRDD has non-idempotent id generation. i don't want to call it twice in case of downstream task failure




ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <[hidden email]>:
Hi Ivan,

Unlike cache/persist, checkpoint does not operate in-place but requires the result to be assigned to a new variable. In your case:

val recordsRDD = convertToRecords(anotherRDD).checkpoint()

Best,
Jacob

Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <[hidden email]>:
Hi!
Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not checkpointed...
What do I do wrong?

val recordsRDD = convertToRecords(anotherRDD)
recordsRDD.checkpoint()
logger.info("checkpoint done")

logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, getCheckpointFile: ${recordsRDD.getCheckpointFile}")
logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

Output:
Job$ - checkpoint done (!!!)

But then.....
Job$ - isCheckpointed? false, getCheckpointFile: None
Job$ - recordsRDD.toDebugString:
(2) MapPartitionsRDD[7] at map at  Job.scala:112 []
 |  MapPartitionsRDD[6] at map at  Job.scala:111 []
 |  MapPartitionsRDD[5] at map at ....scala:40 []
 |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
 +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
    |  MapPartitionsRDD[2] at map at ...:66 []


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

image.png (196K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: RDD which was checkpointed is not checkpointed

Ivan Petrov
i did it and see lineage change

BEFORE calling action. No success.

Job$ - isCheckpointed? false, getCheckpointFile: None
Job$ - recordsRDD.toDebugString:
(2) MapPartitionsRDD[7] at map at  Job.scala:112 []
 |  MapPartitionsRDD[6] at map at  Job.scala:111 []
 |  MapPartitionsRDD[5] at map at ....scala:40 []
 |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
 +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
    |  MapPartitionsRDD[2] at map at ...:66 []

AFTER calling action. nice, it works!
Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
    Job$ - recordsRDD.toDebugString:
    (2) MapPartitionsRDD[7] at map at  Job.scala:112 []


Lineage now contains only one stage but I want to get rid of it too. This stage happens right before the checkpoint. Will Spark try to re-run it in case task failure AFTER checkpoint?
My expectation is that spark will read directly from checkpoint dir, It doesn't have to do anything with previous MapPartitionsRDD[7] at map at  Job.scala:112

ср, 19 авг. 2020 г. в 16:01, Russell Spitzer <[hidden email]>:
Checkpoint is lazy and needs an action to actually do the work. The method just marks the rdd as noted in the doc you posted. 

Call an action twice. The second run should use the checkpoint.



On Wed, Aug 19, 2020, 8:49 AM Ivan Petrov <[hidden email]> wrote:
i think it returns Unit... it won't work
image.png

I found another way to make it work. Called action after checkpoint
val recordsRDD = convertToRecords(anotherRDD)
    recordsRDD.checkpoint()
    logger.info("checkpoint done")
    recordsRDD.count() // (!!!)
    logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, getCheckpointFile: ${recordsRDD.getCheckpointFile}")
    logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

    Output:
    Job$ - checkpoint done (!!!)

    Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
    Job$ - recordsRDD.toDebugString:
    (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
    
But still it has single MapPartitionsRDD in lineage. Lineage became shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i want it to take data directly from checkpoint dir.
MapPartitionsRDD has non-idempotent id generation. i don't want to call it twice in case of downstream task failure




ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <[hidden email]>:
Hi Ivan,

Unlike cache/persist, checkpoint does not operate in-place but requires the result to be assigned to a new variable. In your case:

val recordsRDD = convertToRecords(anotherRDD).checkpoint()

Best,
Jacob

Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <[hidden email]>:
Hi!
Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not checkpointed...
What do I do wrong?

val recordsRDD = convertToRecords(anotherRDD)
recordsRDD.checkpoint()
logger.info("checkpoint done")

logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, getCheckpointFile: ${recordsRDD.getCheckpointFile}")
logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

Output:
Job$ - checkpoint done (!!!)

But then.....
Job$ - isCheckpointed? false, getCheckpointFile: None
Job$ - recordsRDD.toDebugString:
(2) MapPartitionsRDD[7] at map at  Job.scala:112 []
 |  MapPartitionsRDD[6] at map at  Job.scala:111 []
 |  MapPartitionsRDD[5] at map at ....scala:40 []
 |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
 +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
    |  MapPartitionsRDD[2] at map at ...:66 []
Reply | Threaded
Open this post in threaded view
|

Re: RDD which was checkpointed is not checkpointed

Russell Spitzer
It determines whether it can use the checkpoint at runtime, so you'll be able to see it in the UI but not in the plan since you are looking at the plan
before the job is actually running when it checks to see if it can use the checkpoint in the lineage.

Here is a two stage job for example:

scala> val x = sc.parallelize(Seq("foo","bar"))
x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:24

scala> val y = x.repartition(3)
y: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at repartition at <console>:25

scala> y.checkpoint

scala> y.count
res12: Long = 2

image.png

image.png

scala> y.count
res13: Long = 2

image.png

Notice that we were able to skip the first stage because when Stage 11 looked for it's dependencies it
found a checkpointed version of the partitioned data so it didn't need to repartition again. This makes my
2 Stage job into a 2 Stage job with 1 stage skipped or a 1 stage job.



On Wed, Aug 19, 2020 at 9:07 AM Ivan Petrov <[hidden email]> wrote:
i did it and see lineage change

BEFORE calling action. No success.

Job$ - isCheckpointed? false, getCheckpointFile: None
Job$ - recordsRDD.toDebugString:
(2) MapPartitionsRDD[7] at map at  Job.scala:112 []
 |  MapPartitionsRDD[6] at map at  Job.scala:111 []
 |  MapPartitionsRDD[5] at map at ....scala:40 []
 |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
 +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
    |  MapPartitionsRDD[2] at map at ...:66 []

AFTER calling action. nice, it works!
Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
    Job$ - recordsRDD.toDebugString:
    (2) MapPartitionsRDD[7] at map at  Job.scala:112 []


Lineage now contains only one stage but I want to get rid of it too. This stage happens right before the checkpoint. Will Spark try to re-run it in case task failure AFTER checkpoint?
My expectation is that spark will read directly from checkpoint dir, It doesn't have to do anything with previous MapPartitionsRDD[7] at map at  Job.scala:112

ср, 19 авг. 2020 г. в 16:01, Russell Spitzer <[hidden email]>:
Checkpoint is lazy and needs an action to actually do the work. The method just marks the rdd as noted in the doc you posted. 

Call an action twice. The second run should use the checkpoint.



On Wed, Aug 19, 2020, 8:49 AM Ivan Petrov <[hidden email]> wrote:
i think it returns Unit... it won't work
image.png

I found another way to make it work. Called action after checkpoint
val recordsRDD = convertToRecords(anotherRDD)
    recordsRDD.checkpoint()
    logger.info("checkpoint done")
    recordsRDD.count() // (!!!)
    logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, getCheckpointFile: ${recordsRDD.getCheckpointFile}")
    logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

    Output:
    Job$ - checkpoint done (!!!)

    Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
    Job$ - recordsRDD.toDebugString:
    (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
    
But still it has single MapPartitionsRDD in lineage. Lineage became shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i want it to take data directly from checkpoint dir.
MapPartitionsRDD has non-idempotent id generation. i don't want to call it twice in case of downstream task failure




ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <[hidden email]>:
Hi Ivan,

Unlike cache/persist, checkpoint does not operate in-place but requires the result to be assigned to a new variable. In your case:

val recordsRDD = convertToRecords(anotherRDD).checkpoint()

Best,
Jacob

Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <[hidden email]>:
Hi!
Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not checkpointed...
What do I do wrong?

val recordsRDD = convertToRecords(anotherRDD)
recordsRDD.checkpoint()
logger.info("checkpoint done")

logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, getCheckpointFile: ${recordsRDD.getCheckpointFile}")
logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

Output:
Job$ - checkpoint done (!!!)

But then.....
Job$ - isCheckpointed? false, getCheckpointFile: None
Job$ - recordsRDD.toDebugString:
(2) MapPartitionsRDD[7] at map at  Job.scala:112 []
 |  MapPartitionsRDD[6] at map at  Job.scala:111 []
 |  MapPartitionsRDD[5] at map at ....scala:40 []
 |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
 +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
    |  MapPartitionsRDD[2] at map at ...:66 []
Reply | Threaded
Open this post in threaded view
|

Re: RDD which was checkpointed is not checkpointed

Ivan Petrov
Awesome, thanks for explaining it.

ср, 19 авг. 2020 г. в 16:29, Russell Spitzer <[hidden email]>:
It determines whether it can use the checkpoint at runtime, so you'll be able to see it in the UI but not in the plan since you are looking at the plan
before the job is actually running when it checks to see if it can use the checkpoint in the lineage.

Here is a two stage job for example:

scala> val x = sc.parallelize(Seq("foo","bar"))
x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:24

scala> val y = x.repartition(3)
y: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at repartition at <console>:25

scala> y.checkpoint

scala> y.count
res12: Long = 2

image.png

image.png

scala> y.count
res13: Long = 2

image.png

Notice that we were able to skip the first stage because when Stage 11 looked for it's dependencies it
found a checkpointed version of the partitioned data so it didn't need to repartition again. This makes my
2 Stage job into a 2 Stage job with 1 stage skipped or a 1 stage job.



On Wed, Aug 19, 2020 at 9:07 AM Ivan Petrov <[hidden email]> wrote:
i did it and see lineage change

BEFORE calling action. No success.

Job$ - isCheckpointed? false, getCheckpointFile: None
Job$ - recordsRDD.toDebugString:
(2) MapPartitionsRDD[7] at map at  Job.scala:112 []
 |  MapPartitionsRDD[6] at map at  Job.scala:111 []
 |  MapPartitionsRDD[5] at map at ....scala:40 []
 |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
 +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
    |  MapPartitionsRDD[2] at map at ...:66 []

AFTER calling action. nice, it works!
Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
    Job$ - recordsRDD.toDebugString:
    (2) MapPartitionsRDD[7] at map at  Job.scala:112 []


Lineage now contains only one stage but I want to get rid of it too. This stage happens right before the checkpoint. Will Spark try to re-run it in case task failure AFTER checkpoint?
My expectation is that spark will read directly from checkpoint dir, It doesn't have to do anything with previous MapPartitionsRDD[7] at map at  Job.scala:112

ср, 19 авг. 2020 г. в 16:01, Russell Spitzer <[hidden email]>:
Checkpoint is lazy and needs an action to actually do the work. The method just marks the rdd as noted in the doc you posted. 

Call an action twice. The second run should use the checkpoint.



On Wed, Aug 19, 2020, 8:49 AM Ivan Petrov <[hidden email]> wrote:
i think it returns Unit... it won't work
image.png

I found another way to make it work. Called action after checkpoint
val recordsRDD = convertToRecords(anotherRDD)
    recordsRDD.checkpoint()
    logger.info("checkpoint done")
    recordsRDD.count() // (!!!)
    logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, getCheckpointFile: ${recordsRDD.getCheckpointFile}")
    logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

    Output:
    Job$ - checkpoint done (!!!)

    Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
    Job$ - recordsRDD.toDebugString:
    (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
    
But still it has single MapPartitionsRDD in lineage. Lineage became shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i want it to take data directly from checkpoint dir.
MapPartitionsRDD has non-idempotent id generation. i don't want to call it twice in case of downstream task failure




ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <[hidden email]>:
Hi Ivan,

Unlike cache/persist, checkpoint does not operate in-place but requires the result to be assigned to a new variable. In your case:

val recordsRDD = convertToRecords(anotherRDD).checkpoint()

Best,
Jacob

Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <[hidden email]>:
Hi!
Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not checkpointed...
What do I do wrong?

val recordsRDD = convertToRecords(anotherRDD)
recordsRDD.checkpoint()
logger.info("checkpoint done")

logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, getCheckpointFile: ${recordsRDD.getCheckpointFile}")
logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

Output:
Job$ - checkpoint done (!!!)

But then.....
Job$ - isCheckpointed? false, getCheckpointFile: None
Job$ - recordsRDD.toDebugString:
(2) MapPartitionsRDD[7] at map at  Job.scala:112 []
 |  MapPartitionsRDD[6] at map at  Job.scala:111 []
 |  MapPartitionsRDD[5] at map at ....scala:40 []
 |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
 +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
    |  MapPartitionsRDD[2] at map at ...:66 []
Reply | Threaded
Open this post in threaded view
|

Re: RDD which was checkpointed is not checkpointed

abeboparebop
In reply to this post by Ivan Petrov
Oops, you're right. My incorrect answer above applies only to DataFrames (2.1+), not RDDs.

Op wo 19 aug. 2020 om 15:49 schreef Ivan Petrov <[hidden email]>:
i think it returns Unit... it won't work
image.png

I found another way to make it work. Called action after checkpoint
val recordsRDD = convertToRecords(anotherRDD)
    recordsRDD.checkpoint()
    logger.info("checkpoint done")
    recordsRDD.count() // (!!!)
    logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, getCheckpointFile: ${recordsRDD.getCheckpointFile}")
    logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

    Output:
    Job$ - checkpoint done (!!!)

    Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
    Job$ - recordsRDD.toDebugString:
    (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
    
But still it has single MapPartitionsRDD in lineage. Lineage became shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i want it to take data directly from checkpoint dir.
MapPartitionsRDD has non-idempotent id generation. i don't want to call it twice in case of downstream task failure




ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <[hidden email]>:
Hi Ivan,

Unlike cache/persist, checkpoint does not operate in-place but requires the result to be assigned to a new variable. In your case:

val recordsRDD = convertToRecords(anotherRDD).checkpoint()

Best,
Jacob

Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <[hidden email]>:
Hi!
Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not checkpointed...
What do I do wrong?

val recordsRDD = convertToRecords(anotherRDD)
recordsRDD.checkpoint()
logger.info("checkpoint done")

logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, getCheckpointFile: ${recordsRDD.getCheckpointFile}")
logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

Output:
Job$ - checkpoint done (!!!)

But then.....
Job$ - isCheckpointed? false, getCheckpointFile: None
Job$ - recordsRDD.toDebugString:
(2) MapPartitionsRDD[7] at map at  Job.scala:112 []
 |  MapPartitionsRDD[6] at map at  Job.scala:111 []
 |  MapPartitionsRDD[5] at map at ....scala:40 []
 |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
 +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
    |  MapPartitionsRDD[2] at map at ...:66 []