Can we get the partition Index in an UDF

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Can we get the partition Index in an UDF

JayeshLalwani

We are trying to add a column to a Dataframe with some data that is seeded by some random data. We want to be able to control the seed, so multiple runs of the same transformation generate the same output. We also want to generate different random numbers for each partition

 

This is easy to do with mapPartitionsWithIndex. For each partition, we generate a Random number generator that is seeded with a global seed + index of partition. The problem with this is mapPartitionsWithIndex is a blackbox, and any filter predicates that are added after mapPartitionsWithIndex don’t get pushed down to source

 

If we implement this function as an UDF, we can get the filters pushed down to the source, but we don’t have the partition index.

 

Yes, I know we could use the mapPartitionsWithIndex after the filter. That is what we will probably end up doing. I was wondering if there is a way of implementing this without having to move the filter around.



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Reply | Threaded
Open this post in threaded view
|

Re: Can we get the partition Index in an UDF

Vadim Semenov-2
Try using `TaskContext`:

import org.apache.spark.TaskContext
val partitionId = TaskContext.getPartitionId()

On Mon, Jun 25, 2018 at 11:17 AM Lalwani, Jayesh
<[hidden email]> wrote:

>
> We are trying to add a column to a Dataframe with some data that is seeded by some random data. We want to be able to control the seed, so multiple runs of the same transformation generate the same output. We also want to generate different random numbers for each partition
>
>
>
> This is easy to do with mapPartitionsWithIndex. For each partition, we generate a Random number generator that is seeded with a global seed + index of partition. The problem with this is mapPartitionsWithIndex is a blackbox, and any filter predicates that are added after mapPartitionsWithIndex don’t get pushed down to source
>
>
>
> If we implement this function as an UDF, we can get the filters pushed down to the source, but we don’t have the partition index.
>
>
>
> Yes, I know we could use the mapPartitionsWithIndex after the filter. That is what we will probably end up doing. I was wondering if there is a way of implementing this without having to move the filter around.
>
>
> ________________________________
>
> The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.



--
Sent from my iPhone

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]