Saving dataframes with partitionBy: append partitions, overwrite within each

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Saving dataframes with partitionBy: append partitions, overwrite within each

peay
Hello,

I am trying to use data_frame.write.partitionBy("day").save("dataset.parquet") to write a dataset while splitting by day.

I would like to run a Spark job  to process, e.g., a month:
...

and then run another Spark job to add another month using the same folder structure, getting me
...
...

However:
- with save mode "overwrite", when I process the second month, all of dataset.parquet/ gets removed and I lose whatever was already computed for the previous month.
- with save mode "append", then I can't get idempotence: if I run the job to process a given month twice, I'll get duplicate data in all the subfolders for that month.

Is there a way to do "append in terms of the subfolders from partitionBy, but overwrite within each such partitions? Any help would be appreciated.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Saving dataframes with partitionBy: append partitions, overwrite within each

Vadim Semenov
As alternative: checkpoint the dataframe, collect days, and then delete corresponding directories using hadoop FileUtils, then write the dataframe

On Fri, Sep 29, 2017 at 10:31 AM, peay <[hidden email]> wrote:
Hello,

I am trying to use data_frame.write.partitionBy("day").save("dataset.parquet") to write a dataset while splitting by day.

I would like to run a Spark job  to process, e.g., a month:
...

and then run another Spark job to add another month using the same folder structure, getting me
...
...

However:
- with save mode "overwrite", when I process the second month, all of dataset.parquet/ gets removed and I lose whatever was already computed for the previous month.
- with save mode "append", then I can't get idempotence: if I run the job to process a given month twice, I'll get duplicate data in all the subfolders for that month.

Is there a way to do "append in terms of the subfolders from partitionBy, but overwrite within each such partitions? Any help would be appreciated.

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Saving dataframes with partitionBy: append partitions, overwrite within each

Pavel Knoblokh
In reply to this post by peay
If your processing task inherently processes input data by month you
may want to "manually" partition the output data by month as well as
by day, that is to save it with a file name including the given month,
i.e. "dataset.parquet/month=01". Then you will be able to use the
overwrite mode with each month partition. Hope this could be of some
help.

--
Pavel Knoblokh

On Fri, Sep 29, 2017 at 5:31 PM, peay <[hidden email]> wrote:

> Hello,
>
> I am trying to use
> data_frame.write.partitionBy("day").save("dataset.parquet") to write a
> dataset while splitting by day.
>
> I would like to run a Spark job  to process, e.g., a month:
> dataset.parquet/day=2017-01-01/...
> ...
>
> and then run another Spark job to add another month using the same folder
> structure, getting me
> dataset.parquet/day=2017-01-01/
> ...
> dataset.parquet/day=2017-02-01/
> ...
>
> However:
> - with save mode "overwrite", when I process the second month, all of
> dataset.parquet/ gets removed and I lose whatever was already computed for
> the previous month.
> - with save mode "append", then I can't get idempotence: if I run the job to
> process a given month twice, I'll get duplicate data in all the subfolders
> for that month.
>
> Is there a way to do "append in terms of the subfolders from partitionBy,
> but overwrite within each such partitions? Any help would be appreciated.
>
> Thanks!



--
Pavel Knoblokh

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Saving dataframes with partitionBy: append partitions, overwrite within each

Nirav Patel
Hi Peay,

Have you find better solution yet? I am having same issue. 

Following says it works with spark 2.1 onward but only when you use sqlContext and not Dataframe

Thanks,
Nirav

On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh <[hidden email]> wrote:
If your processing task inherently processes input data by month you
may want to "manually" partition the output data by month as well as
by day, that is to save it with a file name including the given month,
i.e. "dataset.parquet/month=01". Then you will be able to use the
overwrite mode with each month partition. Hope this could be of some
help.

--
Pavel Knoblokh

On Fri, Sep 29, 2017 at 5:31 PM, peay <[hidden email]> wrote:
> Hello,
>
> I am trying to use
> data_frame.write.partitionBy("day").save("dataset.parquet") to write a
> dataset while splitting by day.
>
> I would like to run a Spark job  to process, e.g., a month:
> dataset.parquet/day=2017-01-01/...
> ...
>
> and then run another Spark job to add another month using the same folder
> structure, getting me
> dataset.parquet/day=2017-01-01/
> ...
> dataset.parquet/day=2017-02-01/
> ...
>
> However:
> - with save mode "overwrite", when I process the second month, all of
> dataset.parquet/ gets removed and I lose whatever was already computed for
> the previous month.
> - with save mode "append", then I can't get idempotence: if I run the job to
> process a given month twice, I'll get duplicate data in all the subfolders
> for that month.
>
> Is there a way to do "append in terms of the subfolders from partitionBy,
> but overwrite within each such partitions? Any help would be appreciated.
>
> Thanks!



--
Pavel Knoblokh

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]





What's New with Xactly

        
Reply | Threaded
Open this post in threaded view
|

Re: Saving dataframes with partitionBy: append partitions, overwrite within each

Koert Kuipers
this works for dataframes with spark 2.3 by changing a global setting, and will be configurable per write in 2.4
see:

On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel <[hidden email]> wrote:
Hi Peay,

Have you find better solution yet? I am having same issue. 

Following says it works with spark 2.1 onward but only when you use sqlContext and not Dataframe

Thanks,
Nirav

On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh <[hidden email]> wrote:
If your processing task inherently processes input data by month you
may want to "manually" partition the output data by month as well as
by day, that is to save it with a file name including the given month,
i.e. "dataset.parquet/month=01". Then you will be able to use the
overwrite mode with each month partition. Hope this could be of some
help.

--
Pavel Knoblokh

On Fri, Sep 29, 2017 at 5:31 PM, peay <[hidden email]> wrote:
> Hello,
>
> I am trying to use
> data_frame.write.partitionBy("day").save("dataset.parquet") to write a
> dataset while splitting by day.
>
> I would like to run a Spark job  to process, e.g., a month:
> dataset.parquet/day=2017-01-01/...
> ...
>
> and then run another Spark job to add another month using the same folder
> structure, getting me
> dataset.parquet/day=2017-01-01/
> ...
> dataset.parquet/day=2017-02-01/
> ...
>
> However:
> - with save mode "overwrite", when I process the second month, all of
> dataset.parquet/ gets removed and I lose whatever was already computed for
> the previous month.
> - with save mode "append", then I can't get idempotence: if I run the job to
> process a given month twice, I'll get duplicate data in all the subfolders
> for that month.
>
> Is there a way to do "append in terms of the subfolders from partitionBy,
> but overwrite within each such partitions? Any help would be appreciated.
>
> Thanks!



--
Pavel Knoblokh

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]





What's New with Xactly

        

Reply | Threaded
Open this post in threaded view
|

Re: Saving dataframes with partitionBy: append partitions, overwrite within each

Nirav Patel
Thanks Koert. I'll check that out when we can update to 2.3

Meanwhile, I am trying hive sql (INSERT OVERWRITE) statement to insert overwrite multiple partitions. (without loosing existing ones)

It's giving me issues around partition columns.

    dataFrame.createOrReplaceTempView("updateTable") //here dataframe contains values from multiple partitions.

dataFrame also have partition columns but I can't get any of following to execute:

insert overwrite table $tableName PARTITION(P1, P2) select * from updateTable. 

org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.Table.ValidationFailureSemanticException: Partition spec {p1=, p2=, P1=__HIVE_DEFAULT_PARTITION__, P2=1} contains non-partition columns;


Is above a right approach to update multiple partitions? Or should I be more specific updating each partition with separate command like following: 

//Pseudo code; yet to try

df.createOrReplaceTempView("updateTable") 
df.rdd.groupBy(P1, P2).map { (key, Iterable[Row]) =>


  spark.sql("INSERT OVERWRITE TABLE stats 
  PARTITION(P1 = key._1, P2 = key._2)
  SELECT * from updateTable where P1 = key._1 and P2 = key._2")
}

Regards,
Nirav


On Wed, Aug 1, 2018 at 4:18 PM, Koert Kuipers <[hidden email]> wrote:
this works for dataframes with spark 2.3 by changing a global setting, and will be configurable per write in 2.4
see:

On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel <[hidden email]> wrote:
Hi Peay,

Have you find better solution yet? I am having same issue. 

Following says it works with spark 2.1 onward but only when you use sqlContext and not Dataframe

Thanks,
Nirav

On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh <[hidden email]> wrote:
If your processing task inherently processes input data by month you
may want to "manually" partition the output data by month as well as
by day, that is to save it with a file name including the given month,
i.e. "dataset.parquet/month=01". Then you will be able to use the
overwrite mode with each month partition. Hope this could be of some
help.

--
Pavel Knoblokh

On Fri, Sep 29, 2017 at 5:31 PM, peay <[hidden email]> wrote:
> Hello,
>
> I am trying to use
> data_frame.write.partitionBy("day").save("dataset.parquet") to write a
> dataset while splitting by day.
>
> I would like to run a Spark job  to process, e.g., a month:
> dataset.parquet/day=2017-01-01/...
> ...
>
> and then run another Spark job to add another month using the same folder
> structure, getting me
> dataset.parquet/day=2017-01-01/
> ...
> dataset.parquet/day=2017-02-01/
> ...
>
> However:
> - with save mode "overwrite", when I process the second month, all of
> dataset.parquet/ gets removed and I lose whatever was already computed for
> the previous month.
> - with save mode "append", then I can't get idempotence: if I run the job to
> process a given month twice, I'll get duplicate data in all the subfolders
> for that month.
>
> Is there a way to do "append in terms of the subfolders from partitionBy,
> but overwrite within each such partitions? Any help would be appreciated.
>
> Thanks!



--
Pavel Knoblokh

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]





What's New with Xactly

        





What's New with Xactly

        
Reply | Threaded
Open this post in threaded view
|

Re: Saving dataframes with partitionBy: append partitions, overwrite within each

Nirav Patel
I tried following to explicitly specify partition columns in sql statement and also tried different cases (upper and lower) fro partition columns.

insert overwrite table $tableName PARTITION(P1, P2) select A, B, C, P1, P2 from updateTable. 

Still getting: 

Caused by: org.apache.hadoop.hive.ql.metadata.Table$ValidationFailureSemanticException: Partition spec {p1=, p2=, P1=1085, P2=164590861} contains non-partition columns



On Thu, Aug 2, 2018 at 11:37 AM, Nirav Patel <[hidden email]> wrote:
Thanks Koert. I'll check that out when we can update to 2.3

Meanwhile, I am trying hive sql (INSERT OVERWRITE) statement to insert overwrite multiple partitions. (without loosing existing ones)

It's giving me issues around partition columns.

    dataFrame.createOrReplaceTempView("updateTable") //here dataframe contains values from multiple partitions.

dataFrame also have partition columns but I can't get any of following to execute:

insert overwrite table $tableName PARTITION(P1, P2) select * from updateTable. 

org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.Table.ValidationFailureSemanticException: Partition spec {p1=, p2=, P1=__HIVE_DEFAULT_PARTITION__, P2=1} contains non-partition columns;


Is above a right approach to update multiple partitions? Or should I be more specific updating each partition with separate command like following: 

//Pseudo code; yet to try

df.createOrReplaceTempView("updateTable") 
df.rdd.groupBy(P1, P2).map { (key, Iterable[Row]) =>


  spark.sql("INSERT OVERWRITE TABLE stats 
  PARTITION(P1 = key._1, P2 = key._2)
  SELECT * from updateTable where P1 = key._1 and P2 = key._2")
}

Regards,
Nirav


On Wed, Aug 1, 2018 at 4:18 PM, Koert Kuipers <[hidden email]> wrote:
this works for dataframes with spark 2.3 by changing a global setting, and will be configurable per write in 2.4
see:

On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel <[hidden email]> wrote:
Hi Peay,

Have you find better solution yet? I am having same issue. 

Following says it works with spark 2.1 onward but only when you use sqlContext and not Dataframe

Thanks,
Nirav

On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh <[hidden email]> wrote:
If your processing task inherently processes input data by month you
may want to "manually" partition the output data by month as well as
by day, that is to save it with a file name including the given month,
i.e. "dataset.parquet/month=01". Then you will be able to use the
overwrite mode with each month partition. Hope this could be of some
help.

--
Pavel Knoblokh

On Fri, Sep 29, 2017 at 5:31 PM, peay <[hidden email]> wrote:
> Hello,
>
> I am trying to use
> data_frame.write.partitionBy("day").save("dataset.parquet") to write a
> dataset while splitting by day.
>
> I would like to run a Spark job  to process, e.g., a month:
> dataset.parquet/day=2017-01-01/...
> ...
>
> and then run another Spark job to add another month using the same folder
> structure, getting me
> dataset.parquet/day=2017-01-01/
> ...
> dataset.parquet/day=2017-02-01/
> ...
>
> However:
> - with save mode "overwrite", when I process the second month, all of
> dataset.parquet/ gets removed and I lose whatever was already computed for
> the previous month.
> - with save mode "append", then I can't get idempotence: if I run the job to
> process a given month twice, I'll get duplicate data in all the subfolders
> for that month.
>
> Is there a way to do "append in terms of the subfolders from partitionBy,
> but overwrite within each such partitions? Any help would be appreciated.
>
> Thanks!



--
Pavel Knoblokh

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]





What's New with Xactly

        






What's New with Xactly