spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nicholas Chammas <nicholas.cham...@gmail.com>
Subject Re: Suggestion on Join Approach with Spark
Date Wed, 15 May 2019 18:03:46 GMT
This kind of question is for the User list, or for something like Stack
Overflow. It's not on topic here.

The dev list (i.e. this list) is for discussions about the development of
Spark itself.

On Wed, May 15, 2019 at 1:50 PM Chetan Khatri <chetan.opensource@gmail.com>
wrote:

> Any one help me, I am confused. :(
>
> On Wed, May 15, 2019 at 7:28 PM Chetan Khatri <chetan.opensource@gmail.com>
> wrote:
>
>> Hello Spark Developers,
>>
>> I have a question on Spark Join I am doing.
>>
>> I have a full load data from RDBMS and storing at HDFS let's say,
>>
>> val historyDF = spark.read.parquet(*"/home/test/transaction-line-item"*)
>>
>> and I am getting changed data at seperate hdfs path,let's say;
>>
>> val deltaDF = spark.read.parquet("/home/test/transaction-line-item-delta")
>>
>> Now I would like to take rows from deltaDF and ignore only those records from historyDF,
and write to some MySQL table.
>>
>> Once I am done with writing to MySQL table, I would like to update */home/test/transaction-line-item
*as overwrite. Now I can't just
>>
>> overwrite because lazy evaluation and DAG structure unless write to somewhere else
and then write back as overwrite.
>>
>> val syncDataDF = historyDF.join(deltaDF.select("TRANSACTION_BY_LINE_ID", "sys_change_column"),
Seq("TRANSACTION_BY_LINE_ID"),
>>   "left_outer").filter(deltaDF.col("sys_change_column").isNull)
>>     .drop(deltaDF.col("sys_change_column"))
>>
>> val mergedDataDF = syncDataDF.union(deltaDF)
>>
>> I believe, Without doing *union *, only with Join this can be done. Please suggest
best approach.
>>
>> As I can't write back *mergedDataDF * to the path of historyDF, because from there
I am only reading. What I am doing is to write at temp
>>
>> path and then read  from there and write back! Which is bad Idea, I need suggestion
here...
>>
>>
>> mergedDataDF.write.mode(SaveMode.Overwrite).parquet("home/test/transaction-line-item-temp/")
>> val tempMergedDF = spark.read.parquet("home/test/transaction-line-item-temp/")
>> tempMergedDF.write.mode(SaveMode.Overwrite).parquet("*/home/test/transaction-line-item"*)
>>
>>
>> Please suggest me best approach.
>>
>>
>> Thanks
>>
>>
>>
>>

Mime
View raw message