spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Benjamin Kim <bbuil...@gmail.com>
Subject Append In-Place to S3
Date Fri, 01 Jun 2018 16:00:18 GMT
I have a situation where I trying to add only new rows to an existing data set that lives in
S3 as gzipped parquet files, looping and appending for each hour of the day. First, I create
a DF from the existing data, then I use a query to create another DF with the data that is
new. Here is the code snippet.

df = spark.read.parquet(existing_data_path)
df.createOrReplaceTempView(‘existing_data’)
new_df = spark.read.parquet(new_data_path)
new_df.createOrReplaceTempView(’new_data’)
append_df = spark.sql(
        """
        WITH ids AS (
            SELECT DISTINCT
                source,
                source_id,
                target,
                target_id
            FROM new_data i
            LEFT ANTI JOIN existing_data im
            ON i.source = im.source
            AND i.source_id = im.source_id
            AND i.target = im.target
            AND i.target = im.target_id
        """
    )
append_df.coalesce(1).write.parquet(existing_data_path, mode='append', compression='gzip’)

I thought this would append new rows and keep the data unique, but I am see many duplicates.
Can someone help me with this and tell me what I am doing wrong?

Thanks,
Ben
Mime
View raw message