spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tayler Lawrence Jones <t.jonesd...@gmail.com>
Subject Re: Append In-Place to S3
Date Sun, 03 Jun 2018 22:02:45 GMT
Sorry actually my last message is not true for anti join, I was thinking of
semi join.

-TJ

On Sun, Jun 3, 2018 at 14:57 Tayler Lawrence Jones <t.jonesd289@gmail.com>
wrote:

> A left join with null filter is only the same as a left anti join if the
> join keys can be guaranteed unique in the existing data. Since hive tables
> on s3 offer no unique guarantees outside of your processing code, I
> recommend using left anti join over left join + null filter.
>
> -TJ
>
> On Sun, Jun 3, 2018 at 14:47 ayan guha <guha.ayan@gmail.com> wrote:
>
>> I do not use anti join semantics, but you can use left outer join and
>> then filter out nulls from right side. Your data may have dups on the
>> columns separately but it should not have dups on the composite key ie all
>> columns put together.
>>
>> On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones <
>> t.jonesd289@gmail.com> wrote:
>>
>>> The issue is not the append vs overwrite - perhaps those responders do
>>> not know Anti join semantics. Further, Overwrite on s3 is a bad pattern due
>>> to s3 eventual consistency issues.
>>>
>>> First, your sql query is wrong as you don’t close the parenthesis of the
>>> CTE (“with” part). In fact, it looks like you don’t need that with at all,
>>> and the query should fail to parse. If that does parse, I would open a bug
>>> on the spark jira.
>>>
>>> Can you provide the query that you are using to detect duplication so I
>>> can see if your deduplication logic matches the detection query?
>>>
>>> -TJ
>>>
>>> On Sat, Jun 2, 2018 at 10:22 Aakash Basu <aakash.spark.raj@gmail.com>
>>> wrote:
>>>
>>>> As Jay suggested correctly, if you're joining then overwrite otherwise
>>>> only append as it removes dups.
>>>>
>>>> I think, in this scenario, just change it to write.mode('overwrite')
>>>> because you're already reading the old data and your job would be done.
>>>>
>>>>
>>>> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim, <bbuild11@gmail.com> wrote:
>>>>
>>>>> Hi Jay,
>>>>>
>>>>> Thanks for your response. Are you saying to append the new data and
>>>>> then remove the duplicates to the whole data set afterwards overwriting
the
>>>>> existing data set with new data set with appended values? I will give
that
>>>>> a try.
>>>>>
>>>>> Cheers,
>>>>> Ben
>>>>>
>>>>> On Fri, Jun 1, 2018 at 11:49 PM Jay <jayadeep.jayaraman@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Benjamin,
>>>>>>
>>>>>> The append will append the "new" data to the existing data with
>>>>>> removing the duplicates. You would need to overwrite the file everytime
if
>>>>>> you need unique values.
>>>>>>
>>>>>> Thanks,
>>>>>> Jayadeep
>>>>>>
>>>>>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim <bbuild11@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I have a situation where I trying to add only new rows to an
>>>>>>> existing data set that lives in S3 as gzipped parquet files,
looping and
>>>>>>> appending for each hour of the day. First, I create a DF from
the existing
>>>>>>> data, then I use a query to create another DF with the data that
is new.
>>>>>>> Here is the code snippet.
>>>>>>>
>>>>>>> df = spark.read.parquet(existing_data_path)
>>>>>>> df.createOrReplaceTempView(‘existing_data’)
>>>>>>> new_df = spark.read.parquet(new_data_path)
>>>>>>> new_df.createOrReplaceTempView(’new_data’)
>>>>>>> append_df = spark.sql(
>>>>>>>         """
>>>>>>>         WITH ids AS (
>>>>>>>             SELECT DISTINCT
>>>>>>>                 source,
>>>>>>>                 source_id,
>>>>>>>                 target,
>>>>>>>                 target_id
>>>>>>>             FROM new_data i
>>>>>>>             LEFT ANTI JOIN existing_data im
>>>>>>>             ON i.source = im.source
>>>>>>>             AND i.source_id = im.source_id
>>>>>>>             AND i.target = im.target
>>>>>>>             AND i.target = im.target_id
>>>>>>>         """
>>>>>>>     )
>>>>>>> append_df.coalesce(1).write.parquet(existing_data_path,
>>>>>>> mode='append', compression='gzip’)
>>>>>>>
>>>>>>>
>>>>>>> I thought this would append new rows and keep the data unique,
but I
>>>>>>> am see many duplicates. Can someone help me with this and tell
me what I am
>>>>>>> doing wrong?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Ben
>>>>>>>
>>>>>> --
>> Best Regards,
>> Ayan Guha
>>
>

Mime
View raw message