spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chetan Khatri <chetan.opensou...@gmail.com>
Subject Re: Suggestion on Join Approach with Spark
Date Wed, 15 May 2019 18:20:20 GMT
Hello Nicholas,

I sincerely apologise.

Thanks

On Wed, May 15, 2019 at 11:34 PM Nicholas Chammas <
nicholas.chammas@gmail.com> wrote:

> This kind of question is for the User list, or for something like Stack
> Overflow. It's not on topic here.
>
> The dev list (i.e. this list) is for discussions about the development of
> Spark itself.
>
> On Wed, May 15, 2019 at 1:50 PM Chetan Khatri <chetan.opensource@gmail.com>
> wrote:
>
>> Any one help me, I am confused. :(
>>
>> On Wed, May 15, 2019 at 7:28 PM Chetan Khatri <
>> chetan.opensource@gmail.com> wrote:
>>
>>> Hello Spark Developers,
>>>
>>> I have a question on Spark Join I am doing.
>>>
>>> I have a full load data from RDBMS and storing at HDFS let's say,
>>>
>>> val historyDF = spark.read.parquet(*"/home/test/transaction-line-item"*)
>>>
>>> and I am getting changed data at seperate hdfs path,let's say;
>>>
>>> val deltaDF = spark.read.parquet("/home/test/transaction-line-item-delta")
>>>
>>> Now I would like to take rows from deltaDF and ignore only those records from
historyDF, and write to some MySQL table.
>>>
>>> Once I am done with writing to MySQL table, I would like to update */home/test/transaction-line-item
*as overwrite. Now I can't just
>>>
>>> overwrite because lazy evaluation and DAG structure unless write to somewhere
else and then write back as overwrite.
>>>
>>> val syncDataDF = historyDF.join(deltaDF.select("TRANSACTION_BY_LINE_ID", "sys_change_column"),
Seq("TRANSACTION_BY_LINE_ID"),
>>>   "left_outer").filter(deltaDF.col("sys_change_column").isNull)
>>>     .drop(deltaDF.col("sys_change_column"))
>>>
>>> val mergedDataDF = syncDataDF.union(deltaDF)
>>>
>>> I believe, Without doing *union *, only with Join this can be done. Please suggest
best approach.
>>>
>>> As I can't write back *mergedDataDF * to the path of historyDF, because from
there I am only reading. What I am doing is to write at temp
>>>
>>> path and then read  from there and write back! Which is bad Idea, I need suggestion
here...
>>>
>>>
>>> mergedDataDF.write.mode(SaveMode.Overwrite).parquet("home/test/transaction-line-item-temp/")
>>> val tempMergedDF = spark.read.parquet("home/test/transaction-line-item-temp/")
>>> tempMergedDF.write.mode(SaveMode.Overwrite).parquet("*/home/test/transaction-line-item"*)
>>>
>>>
>>> Please suggest me best approach.
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>>

Mime
View raw message