Append In-Place to S3

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Append In-Place to S3

benassi
I have a situation where I trying to add only new rows to an existing data set that lives in S3 as gzipped parquet files, looping and appending for each hour of the day. First, I create a DF from the existing data, then I use a query to create another DF with the data that is new. Here is the code snippet.

df = spark.read.parquet(existing_data_path)
df.createOrReplaceTempView(‘existing_data’)
new_df = spark.read.parquet(new_data_path)
new_df.createOrReplaceTempView(’new_data’)
append_df = spark.sql(
        """
        WITH ids AS (
            SELECT DISTINCT
                source,
                source_id,
                target,
                target_id
            FROM new_data i
            LEFT ANTI JOIN existing_data im
            ON i.source = im.source
            AND i.source_id = im.source_id
            AND i.target = im.target
            AND i.target = im.target_id
        """
    )
append_df.coalesce(1).write.parquet(existing_data_path, mode='append', compression='gzip’)

I thought this would append new rows and keep the data unique, but I am see many duplicates. Can someone help me with this and tell me what I am doing wrong?

Thanks,
Ben
Reply | Threaded
Open this post in threaded view
|

Re: Append In-Place to S3

jjayadeep
Benjamin,

The append will append the "new" data to the existing data with removing the duplicates. You would need to overwrite the file everytime if you need unique values.

Thanks,
Jayadeep

On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim <[hidden email]> wrote:
I have a situation where I trying to add only new rows to an existing data set that lives in S3 as gzipped parquet files, looping and appending for each hour of the day. First, I create a DF from the existing data, then I use a query to create another DF with the data that is new. Here is the code snippet.

df = spark.read.parquet(existing_data_path)
df.createOrReplaceTempView(‘existing_data’)
new_df = spark.read.parquet(new_data_path)
new_df.createOrReplaceTempView(’new_data’)
append_df = spark.sql(
        """
        WITH ids AS (
            SELECT DISTINCT
                source,
                source_id,
                target,
                target_id
            FROM new_data i
            LEFT ANTI JOIN existing_data im
            ON i.source = im.source
            AND i.source_id = im.source_id
            AND i.target = im.target
            AND i.target = im.target_id
        """
    )
append_df.coalesce(1).write.parquet(existing_data_path, mode='append', compression='gzip’)

I thought this would append new rows and keep the data unique, but I am see many duplicates. Can someone help me with this and tell me what I am doing wrong?

Thanks,
Ben
Reply | Threaded
Open this post in threaded view
|

Re: Append In-Place to S3

vincent gromakowski
In reply to this post by benassi

Structured streaming can provide idempotent and exactly once writings in parquet but I don't know how it does under the hood.
Without this you need to load all your dataset, then dedup, then write back the entire dataset. This overhead can be minimized with partitionning output files.


Le ven. 1 juin 2018, 18:01, Benjamin Kim <[hidden email]> a écrit :
I have a situation where I trying to add only new rows to an existing data set that lives in S3 as gzipped parquet files, looping and appending for each hour of the day. First, I create a DF from the existing data, then I use a query to create another DF with the data that is new. Here is the code snippet.

df = spark.read.parquet(existing_data_path)
df.createOrReplaceTempView(‘existing_data’)
new_df = spark.read.parquet(new_data_path)
new_df.createOrReplaceTempView(’new_data’)
append_df = spark.sql(
        """
        WITH ids AS (
            SELECT DISTINCT
                source,
                source_id,
                target,
                target_id
            FROM new_data i
            LEFT ANTI JOIN existing_data im
            ON i.source = im.source
            AND i.source_id = im.source_id
            AND i.target = im.target
            AND i.target = im.target_id
        """
    )
append_df.coalesce(1).write.parquet(existing_data_path, mode='append', compression='gzip’)

I thought this would append new rows and keep the data unique, but I am see many duplicates. Can someone help me with this and tell me what I am doing wrong?

Thanks,
Ben
Reply | Threaded
Open this post in threaded view
|

Re: Append In-Place to S3

benassi
In reply to this post by jjayadeep
Hi Jay,

Thanks for your response. Are you saying to append the new data and then remove the duplicates to the whole data set afterwards overwriting the existing data set with new data set with appended values? I will give that a try. 

Cheers,
Ben

On Fri, Jun 1, 2018 at 11:49 PM Jay <[hidden email]> wrote:
Benjamin,

The append will append the "new" data to the existing data with removing the duplicates. You would need to overwrite the file everytime if you need unique values.

Thanks,
Jayadeep

On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim <[hidden email]> wrote:
I have a situation where I trying to add only new rows to an existing data set that lives in S3 as gzipped parquet files, looping and appending for each hour of the day. First, I create a DF from the existing data, then I use a query to create another DF with the data that is new. Here is the code snippet.

df = spark.read.parquet(existing_data_path)
df.createOrReplaceTempView(‘existing_data’)
new_df = spark.read.parquet(new_data_path)
new_df.createOrReplaceTempView(’new_data’)
append_df = spark.sql(
        """
        WITH ids AS (
            SELECT DISTINCT
                source,
                source_id,
                target,
                target_id
            FROM new_data i
            LEFT ANTI JOIN existing_data im
            ON i.source = im.source
            AND i.source_id = im.source_id
            AND i.target = im.target
            AND i.target = im.target_id
        """
    )
append_df.coalesce(1).write.parquet(existing_data_path, mode='append', compression='gzip’)

I thought this would append new rows and keep the data unique, but I am see many duplicates. Can someone help me with this and tell me what I am doing wrong?

Thanks,
Ben
Reply | Threaded
Open this post in threaded view
|

Re: Append In-Place to S3

Aakash Basu-2
As Jay suggested correctly, if you're joining then overwrite otherwise only append as it removes dups.

I think, in this scenario, just change it to write.mode('overwrite') because you're already reading the old data and your job would be done.


On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim, <[hidden email]> wrote:
Hi Jay,

Thanks for your response. Are you saying to append the new data and then remove the duplicates to the whole data set afterwards overwriting the existing data set with new data set with appended values? I will give that a try. 

Cheers,
Ben

On Fri, Jun 1, 2018 at 11:49 PM Jay <[hidden email]> wrote:
Benjamin,

The append will append the "new" data to the existing data with removing the duplicates. You would need to overwrite the file everytime if you need unique values.

Thanks,
Jayadeep

On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim <[hidden email]> wrote:
I have a situation where I trying to add only new rows to an existing data set that lives in S3 as gzipped parquet files, looping and appending for each hour of the day. First, I create a DF from the existing data, then I use a query to create another DF with the data that is new. Here is the code snippet.

df = spark.read.parquet(existing_data_path)
df.createOrReplaceTempView(‘existing_data’)
new_df = spark.read.parquet(new_data_path)
new_df.createOrReplaceTempView(’new_data’)
append_df = spark.sql(
        """
        WITH ids AS (
            SELECT DISTINCT
                source,
                source_id,
                target,
                target_id
            FROM new_data i
            LEFT ANTI JOIN existing_data im
            ON i.source = im.source
            AND i.source_id = im.source_id
            AND i.target = im.target
            AND i.target = im.target_id
        """
    )
append_df.coalesce(1).write.parquet(existing_data_path, mode='append', compression='gzip’)

I thought this would append new rows and keep the data unique, but I am see many duplicates. Can someone help me with this and tell me what I am doing wrong?

Thanks,
Ben
Reply | Threaded
Open this post in threaded view
|

Re: Append In-Place to S3

Tayler Lawrence Jones
The issue is not the append vs overwrite - perhaps those responders do not know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to s3 eventual consistency issues. 

First, your sql query is wrong as you don’t close the parenthesis of the CTE (“with” part). In fact, it looks like you don’t need that with at all, and the query should fail to parse. If that does parse, I would open a bug on the spark jira.

Can you provide the query that you are using to detect duplication so I can see if your deduplication logic matches the detection query? 

-TJ

On Sat, Jun 2, 2018 at 10:22 Aakash Basu <[hidden email]> wrote:
As Jay suggested correctly, if you're joining then overwrite otherwise only append as it removes dups.

I think, in this scenario, just change it to write.mode('overwrite') because you're already reading the old data and your job would be done.


On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim, <[hidden email]> wrote:
Hi Jay,

Thanks for your response. Are you saying to append the new data and then remove the duplicates to the whole data set afterwards overwriting the existing data set with new data set with appended values? I will give that a try. 

Cheers,
Ben

On Fri, Jun 1, 2018 at 11:49 PM Jay <[hidden email]> wrote:
Benjamin,

The append will append the "new" data to the existing data with removing the duplicates. You would need to overwrite the file everytime if you need unique values.

Thanks,
Jayadeep

On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim <[hidden email]> wrote:
I have a situation where I trying to add only new rows to an existing data set that lives in S3 as gzipped parquet files, looping and appending for each hour of the day. First, I create a DF from the existing data, then I use a query to create another DF with the data that is new. Here is the code snippet.

df = spark.read.parquet(existing_data_path)
df.createOrReplaceTempView(‘existing_data’)
new_df = spark.read.parquet(new_data_path)
new_df.createOrReplaceTempView(’new_data’)
append_df = spark.sql(
        """
        WITH ids AS (
            SELECT DISTINCT
                source,
                source_id,
                target,
                target_id
            FROM new_data i
            LEFT ANTI JOIN existing_data im
            ON i.source = im.source
            AND i.source_id = im.source_id
            AND i.target = im.target
            AND i.target = im.target_id
        """
    )
append_df.coalesce(1).write.parquet(existing_data_path, mode='append', compression='gzip’)

I thought this would append new rows and keep the data unique, but I am see many duplicates. Can someone help me with this and tell me what I am doing wrong?

Thanks,
Ben
Reply | Threaded
Open this post in threaded view
|

Re: Append In-Place to S3

ayan guha
I do not use anti join semantics, but you can use left outer join and then filter out nulls from right side. Your data may have dups on the columns separately but it should not have dups on the composite key ie all columns put together.

On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones <[hidden email]> wrote:
The issue is not the append vs overwrite - perhaps those responders do not know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to s3 eventual consistency issues. 

First, your sql query is wrong as you don’t close the parenthesis of the CTE (“with” part). In fact, it looks like you don’t need that with at all, and the query should fail to parse. If that does parse, I would open a bug on the spark jira.

Can you provide the query that you are using to detect duplication so I can see if your deduplication logic matches the detection query? 

-TJ

On Sat, Jun 2, 2018 at 10:22 Aakash Basu <[hidden email]> wrote:
As Jay suggested correctly, if you're joining then overwrite otherwise only append as it removes dups.

I think, in this scenario, just change it to write.mode('overwrite') because you're already reading the old data and your job would be done.


On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim, <[hidden email]> wrote:
Hi Jay,

Thanks for your response. Are you saying to append the new data and then remove the duplicates to the whole data set afterwards overwriting the existing data set with new data set with appended values? I will give that a try. 

Cheers,
Ben

On Fri, Jun 1, 2018 at 11:49 PM Jay <[hidden email]> wrote:
Benjamin,

The append will append the "new" data to the existing data with removing the duplicates. You would need to overwrite the file everytime if you need unique values.

Thanks,
Jayadeep

On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim <[hidden email]> wrote:
I have a situation where I trying to add only new rows to an existing data set that lives in S3 as gzipped parquet files, looping and appending for each hour of the day. First, I create a DF from the existing data, then I use a query to create another DF with the data that is new. Here is the code snippet.

df = spark.read.parquet(existing_data_path)
df.createOrReplaceTempView(‘existing_data’)
new_df = spark.read.parquet(new_data_path)
new_df.createOrReplaceTempView(’new_data’)
append_df = spark.sql(
        """
        WITH ids AS (
            SELECT DISTINCT
                source,
                source_id,
                target,
                target_id
            FROM new_data i
            LEFT ANTI JOIN existing_data im
            ON i.source = im.source
            AND i.source_id = im.source_id
            AND i.target = im.target
            AND i.target = im.target_id
        """
    )
append_df.coalesce(1).write.parquet(existing_data_path, mode='append', compression='gzip’)

I thought this would append new rows and keep the data unique, but I am see many duplicates. Can someone help me with this and tell me what I am doing wrong?

Thanks,
Ben
--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Append In-Place to S3

Tayler Lawrence Jones
A left join with null filter is only the same as a left anti join if the join keys can be guaranteed unique in the existing data. Since hive tables on s3 offer no unique guarantees outside of your processing code, I recommend using left anti join over left join + null filter.

-TJ

On Sun, Jun 3, 2018 at 14:47 ayan guha <[hidden email]> wrote:
I do not use anti join semantics, but you can use left outer join and then filter out nulls from right side. Your data may have dups on the columns separately but it should not have dups on the composite key ie all columns put together.

On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones <[hidden email]> wrote:
The issue is not the append vs overwrite - perhaps those responders do not know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to s3 eventual consistency issues. 

First, your sql query is wrong as you don’t close the parenthesis of the CTE (“with” part). In fact, it looks like you don’t need that with at all, and the query should fail to parse. If that does parse, I would open a bug on the spark jira.

Can you provide the query that you are using to detect duplication so I can see if your deduplication logic matches the detection query? 

-TJ

On Sat, Jun 2, 2018 at 10:22 Aakash Basu <[hidden email]> wrote:
As Jay suggested correctly, if you're joining then overwrite otherwise only append as it removes dups.

I think, in this scenario, just change it to write.mode('overwrite') because you're already reading the old data and your job would be done.


On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim, <[hidden email]> wrote:
Hi Jay,

Thanks for your response. Are you saying to append the new data and then remove the duplicates to the whole data set afterwards overwriting the existing data set with new data set with appended values? I will give that a try. 

Cheers,
Ben

On Fri, Jun 1, 2018 at 11:49 PM Jay <[hidden email]> wrote:
Benjamin,

The append will append the "new" data to the existing data with removing the duplicates. You would need to overwrite the file everytime if you need unique values.

Thanks,
Jayadeep

On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim <[hidden email]> wrote:
I have a situation where I trying to add only new rows to an existing data set that lives in S3 as gzipped parquet files, looping and appending for each hour of the day. First, I create a DF from the existing data, then I use a query to create another DF with the data that is new. Here is the code snippet.

df = spark.read.parquet(existing_data_path)
df.createOrReplaceTempView(‘existing_data’)
new_df = spark.read.parquet(new_data_path)
new_df.createOrReplaceTempView(’new_data’)
append_df = spark.sql(
        """
        WITH ids AS (
            SELECT DISTINCT
                source,
                source_id,
                target,
                target_id
            FROM new_data i
            LEFT ANTI JOIN existing_data im
            ON i.source = im.source
            AND i.source_id = im.source_id
            AND i.target = im.target
            AND i.target = im.target_id
        """
    )
append_df.coalesce(1).write.parquet(existing_data_path, mode='append', compression='gzip’)

I thought this would append new rows and keep the data unique, but I am see many duplicates. Can someone help me with this and tell me what I am doing wrong?

Thanks,
Ben
--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Append In-Place to S3

Tayler Lawrence Jones
Sorry actually my last message is not true for anti join, I was thinking of semi join. 

-TJ

On Sun, Jun 3, 2018 at 14:57 Tayler Lawrence Jones <[hidden email]> wrote:
A left join with null filter is only the same as a left anti join if the join keys can be guaranteed unique in the existing data. Since hive tables on s3 offer no unique guarantees outside of your processing code, I recommend using left anti join over left join + null filter.

-TJ

On Sun, Jun 3, 2018 at 14:47 ayan guha <[hidden email]> wrote:
I do not use anti join semantics, but you can use left outer join and then filter out nulls from right side. Your data may have dups on the columns separately but it should not have dups on the composite key ie all columns put together.

On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones <[hidden email]> wrote:
The issue is not the append vs overwrite - perhaps those responders do not know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to s3 eventual consistency issues. 

First, your sql query is wrong as you don’t close the parenthesis of the CTE (“with” part). In fact, it looks like you don’t need that with at all, and the query should fail to parse. If that does parse, I would open a bug on the spark jira.

Can you provide the query that you are using to detect duplication so I can see if your deduplication logic matches the detection query? 

-TJ

On Sat, Jun 2, 2018 at 10:22 Aakash Basu <[hidden email]> wrote:
As Jay suggested correctly, if you're joining then overwrite otherwise only append as it removes dups.

I think, in this scenario, just change it to write.mode('overwrite') because you're already reading the old data and your job would be done.


On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim, <[hidden email]> wrote:
Hi Jay,

Thanks for your response. Are you saying to append the new data and then remove the duplicates to the whole data set afterwards overwriting the existing data set with new data set with appended values? I will give that a try. 

Cheers,
Ben

On Fri, Jun 1, 2018 at 11:49 PM Jay <[hidden email]> wrote:
Benjamin,

The append will append the "new" data to the existing data with removing the duplicates. You would need to overwrite the file everytime if you need unique values.

Thanks,
Jayadeep

On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim <[hidden email]> wrote:
I have a situation where I trying to add only new rows to an existing data set that lives in S3 as gzipped parquet files, looping and appending for each hour of the day. First, I create a DF from the existing data, then I use a query to create another DF with the data that is new. Here is the code snippet.

df = spark.read.parquet(existing_data_path)
df.createOrReplaceTempView(‘existing_data’)
new_df = spark.read.parquet(new_data_path)
new_df.createOrReplaceTempView(’new_data’)
append_df = spark.sql(
        """
        WITH ids AS (
            SELECT DISTINCT
                source,
                source_id,
                target,
                target_id
            FROM new_data i
            LEFT ANTI JOIN existing_data im
            ON i.source = im.source
            AND i.source_id = im.source_id
            AND i.target = im.target
            AND i.target = im.target_id
        """
    )
append_df.coalesce(1).write.parquet(existing_data_path, mode='append', compression='gzip’)

I thought this would append new rows and keep the data unique, but I am see many duplicates. Can someone help me with this and tell me what I am doing wrong?

Thanks,
Ben
--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Append In-Place to S3

benassi
I tried a different tactic. I still append based on the query below, but I add another deduping step afterwards, writing to a staging directory then overwriting back. Luckily, the data is small enough for this to happen fast.

Cheers,
Ben

On Jun 3, 2018, at 3:02 PM, Tayler Lawrence Jones <[hidden email]> wrote:

Sorry actually my last message is not true for anti join, I was thinking of semi join. 

-TJ

On Sun, Jun 3, 2018 at 14:57 Tayler Lawrence Jones <[hidden email]> wrote:
A left join with null filter is only the same as a left anti join if the join keys can be guaranteed unique in the existing data. Since hive tables on s3 offer no unique guarantees outside of your processing code, I recommend using left anti join over left join + null filter.

-TJ

On Sun, Jun 3, 2018 at 14:47 ayan guha <[hidden email]> wrote:
I do not use anti join semantics, but you can use left outer join and then filter out nulls from right side. Your data may have dups on the columns separately but it should not have dups on the composite key ie all columns put together.

On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones <[hidden email]> wrote:
The issue is not the append vs overwrite - perhaps those responders do not know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to s3 eventual consistency issues. 

First, your sql query is wrong as you don’t close the parenthesis of the CTE (“with” part). In fact, it looks like you don’t need that with at all, and the query should fail to parse. If that does parse, I would open a bug on the spark jira.

Can you provide the query that you are using to detect duplication so I can see if your deduplication logic matches the detection query? 

-TJ

On Sat, Jun 2, 2018 at 10:22 Aakash Basu <[hidden email]> wrote:
As Jay suggested correctly, if you're joining then overwrite otherwise only append as it removes dups.

I think, in this scenario, just change it to write.mode('overwrite') because you're already reading the old data and your job would be done.


On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim, <[hidden email]> wrote:
Hi Jay,

Thanks for your response. Are you saying to append the new data and then remove the duplicates to the whole data set afterwards overwriting the existing data set with new data set with appended values? I will give that a try. 

Cheers,
Ben

On Fri, Jun 1, 2018 at 11:49 PM Jay <[hidden email]> wrote:
Benjamin,

The append will append the "new" data to the existing data with removing the duplicates. You would need to overwrite the file everytime if you need unique values.

Thanks,
Jayadeep

On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim <[hidden email]> wrote:
I have a situation where I trying to add only new rows to an existing data set that lives in S3 as gzipped parquet files, looping and appending for each hour of the day. First, I create a DF from the existing data, then I use a query to create another DF with the data that is new. Here is the code snippet.

df = spark.read.parquet(existing_data_path)
df.createOrReplaceTempView(‘existing_data’)
new_df = spark.read.parquet(new_data_path)
new_df.createOrReplaceTempView(’new_data’)
append_df = spark.sql(
        """
        WITH ids AS (
            SELECT DISTINCT
                source,
                source_id,
                target,
                target_id
            FROM new_data i
            LEFT ANTI JOIN existing_data im
            ON i.source = im.source
            AND i.source_id = im.source_id
            AND i.target = im.target
            AND i.target = im.target_id
        """
    )
append_df.coalesce(1).write.parquet(existing_data_path, mode='append', compression='gzip’)

I thought this would append new rows and keep the data unique, but I am see many duplicates. Can someone help me with this and tell me what I am doing wrong?

Thanks,
Ben
--
Best Regards,
Ayan Guha