spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ayan guha <guha.a...@gmail.com>
Subject Re: Append In-Place to S3
Date Sun, 03 Jun 2018 21:46:56 GMT
I do not use anti join semantics, but you can use left outer join and then
filter out nulls from right side. Your data may have dups on the columns
separately but it should not have dups on the composite key ie all columns
put together.

On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones <t.jonesd289@gmail.com>
wrote:

> The issue is not the append vs overwrite - perhaps those responders do not
> know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to
> s3 eventual consistency issues.
>
> First, your sql query is wrong as you don’t close the parenthesis of the
> CTE (“with” part). In fact, it looks like you don’t need that with at all,
> and the query should fail to parse. If that does parse, I would open a bug
> on the spark jira.
>
> Can you provide the query that you are using to detect duplication so I
> can see if your deduplication logic matches the detection query?
>
> -TJ
>
> On Sat, Jun 2, 2018 at 10:22 Aakash Basu <aakash.spark.raj@gmail.com>
> wrote:
>
>> As Jay suggested correctly, if you're joining then overwrite otherwise
>> only append as it removes dups.
>>
>> I think, in this scenario, just change it to write.mode('overwrite')
>> because you're already reading the old data and your job would be done.
>>
>>
>> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim, <bbuild11@gmail.com> wrote:
>>
>>> Hi Jay,
>>>
>>> Thanks for your response. Are you saying to append the new data and then
>>> remove the duplicates to the whole data set afterwards overwriting the
>>> existing data set with new data set with appended values? I will give that
>>> a try.
>>>
>>> Cheers,
>>> Ben
>>>
>>> On Fri, Jun 1, 2018 at 11:49 PM Jay <jayadeep.jayaraman@gmail.com>
>>> wrote:
>>>
>>>> Benjamin,
>>>>
>>>> The append will append the "new" data to the existing data with
>>>> removing the duplicates. You would need to overwrite the file everytime if
>>>> you need unique values.
>>>>
>>>> Thanks,
>>>> Jayadeep
>>>>
>>>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim <bbuild11@gmail.com> wrote:
>>>>
>>>>> I have a situation where I trying to add only new rows to an existing
>>>>> data set that lives in S3 as gzipped parquet files, looping and appending
>>>>> for each hour of the day. First, I create a DF from the existing data,
then
>>>>> I use a query to create another DF with the data that is new. Here is
the
>>>>> code snippet.
>>>>>
>>>>> df = spark.read.parquet(existing_data_path)
>>>>> df.createOrReplaceTempView(‘existing_data’)
>>>>> new_df = spark.read.parquet(new_data_path)
>>>>> new_df.createOrReplaceTempView(’new_data’)
>>>>> append_df = spark.sql(
>>>>>         """
>>>>>         WITH ids AS (
>>>>>             SELECT DISTINCT
>>>>>                 source,
>>>>>                 source_id,
>>>>>                 target,
>>>>>                 target_id
>>>>>             FROM new_data i
>>>>>             LEFT ANTI JOIN existing_data im
>>>>>             ON i.source = im.source
>>>>>             AND i.source_id = im.source_id
>>>>>             AND i.target = im.target
>>>>>             AND i.target = im.target_id
>>>>>         """
>>>>>     )
>>>>> append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
>>>>> compression='gzip’)
>>>>>
>>>>>
>>>>> I thought this would append new rows and keep the data unique, but I
>>>>> am see many duplicates. Can someone help me with this and tell me what
I am
>>>>> doing wrong?
>>>>>
>>>>> Thanks,
>>>>> Ben
>>>>>
>>>> --
Best Regards,
Ayan Guha

Mime
View raw message